From 794a7327da1a0905bc2b3b0f2bff34259d7bc78b Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 2 Jul 2024 15:34:49 +0000 Subject: [PATCH] Revert "Implement decompression side of table packing" to make reviewing easie This reverts commit 19590b17cac3cab59f9a068ab68e855a89cdb1b5. --- components/core/src/clp_s/ArchiveReader.cpp | 114 +++++++------------- components/core/src/clp_s/ArchiveReader.hpp | 24 +---- components/core/src/clp_s/CMakeLists.txt | 2 - components/core/src/clp_s/SchemaReader.cpp | 18 ++-- components/core/src/clp_s/SchemaReader.hpp | 19 ++-- components/core/src/clp_s/TableReader.cpp | 108 ------------------- components/core/src/clp_s/TableReader.hpp | 89 --------------- components/core/src/clp_s/search/Output.cpp | 2 +- 8 files changed, 64 insertions(+), 312 deletions(-) delete mode 100644 components/core/src/clp_s/TableReader.cpp delete mode 100644 components/core/src/clp_s/TableReader.hpp diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 3c62cc63b..f211a0707 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) { m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str); m_schema_map = ReaderUtils::read_schemas(archive_path_str); + m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile); m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile); - m_table_reader.open_tables(archive_path_str + constants::cArchiveTablesFile); } void ArchiveReader::read_metadata() { @@ -38,8 +38,6 @@ void ArchiveReader::read_metadata() { cDecompressorFileReadBufferCapacity ); - m_table_reader.read_metadata(m_table_metadata_decompressor); - size_t num_schemas; if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas); ErrorCodeSuccess != error) @@ -47,65 +45,39 @@ void ArchiveReader::read_metadata() { throw OperationFailed(error, __FILENAME__, __LINE__); } - bool prev_metadata_initialized{false}; - SchemaReader::SchemaMetadata prev_metadata{}; - int32_t prev_schema_id{}; - for (size_t i = 0; i < num_schemas; ++i) { - size_t table_id; - size_t table_offset; + for (size_t i = 0; i < num_schemas; i++) { int32_t schema_id; - size_t num_messages; + uint64_t num_messages; + size_t table_offset; + size_t uncompressed_size; - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_id); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (table_offset > m_table_reader.get_uncompressed_table_size(table_id)) { - throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__); - } - - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (prev_metadata_initialized) { - size_t uncompressed_size{0}; - if (table_id != prev_metadata.table_id) { - uncompressed_size - = m_table_reader.get_uncompressed_table_size(prev_metadata.table_id) - - prev_metadata.table_offset; - } else { - uncompressed_size = table_offset - prev_metadata.table_offset; - } - prev_metadata.uncompressed_size = uncompressed_size; - m_id_to_schema_metadata[prev_schema_id] = prev_metadata; - } else { - prev_metadata_initialized = true; - } - prev_metadata = {table_id, table_offset, num_messages, 0}; - prev_schema_id = schema_id; + m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size}; m_schema_ids.push_back(schema_id); } - prev_metadata.uncompressed_size - = m_table_reader.get_uncompressed_table_size(prev_metadata.table_id) - - prev_metadata.table_offset; - m_id_to_schema_metadata[prev_schema_id] = prev_metadata; m_table_metadata_decompressor.close(); } @@ -117,12 +89,14 @@ void ArchiveReader::read_dictionaries_and_metadata() { read_metadata(); } -SchemaReader& ArchiveReader::read_schema_table( +SchemaReader& ArchiveReader::read_table( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records ) { - if (m_id_to_schema_metadata.count(schema_id) == 0) { + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + + if (m_id_to_table_metadata.count(schema_id) == 0) { throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__); } @@ -133,26 +107,30 @@ SchemaReader& ArchiveReader::read_schema_table( should_marshal_records ); - auto& schema_metadata = m_id_to_schema_metadata[schema_id]; - auto table_buffer = read_table(schema_metadata.table_id, true); - m_schema_reader - .load(table_buffer, schema_metadata.table_offset, schema_metadata.uncompressed_size); + m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset); + m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); + m_schema_reader.load( + m_tables_decompressor, + m_id_to_table_metadata[schema_id].uncompressed_size + ); + m_tables_decompressor.close_for_reuse(); return m_schema_reader; } std::vector> ArchiveReader::read_all_tables() { + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + std::vector> readers; - readers.reserve(m_id_to_schema_metadata.size()); - for (auto schema_id : m_schema_ids) { + readers.reserve(m_id_to_table_metadata.size()); + for (auto const& [id, table_metadata] : m_id_to_table_metadata) { auto schema_reader = std::make_shared(); - initialize_schema_reader(*schema_reader, schema_id, true, true); - auto& schema_metadata = m_id_to_schema_metadata[schema_id]; - auto table_buffer = read_table(schema_metadata.table_id, false); - schema_reader->load( - table_buffer, - schema_metadata.table_offset, - schema_metadata.uncompressed_size - ); + initialize_schema_reader(*schema_reader, id, true, true); + + m_tables_file_reader.try_seek_from_begin(table_metadata.offset); + m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); + schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size); + m_tables_decompressor.close_for_reuse(); + readers.push_back(std::move(schema_reader)); } return readers; @@ -259,7 +237,7 @@ void ArchiveReader::initialize_schema_reader( m_schema_tree, schema_id, schema.get_ordered_schema_view(), - m_id_to_schema_metadata[schema_id].num_messages, + m_id_to_table_metadata[schema_id].num_messages, should_marshal_records ); auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids(); @@ -306,8 +284,9 @@ void ArchiveReader::initialize_schema_reader( void ArchiveReader::store(FileWriter& writer) { std::string message; - for (auto schema_id : m_schema_ids) { - auto& schema_reader = read_schema_table(schema_id, false, true); + + for (auto& [id, table_metadata] : m_id_to_table_metadata) { + auto& schema_reader = read_table(id, false, true); while (schema_reader.get_next_message(message)) { writer.write(message.c_str(), message.length()); } @@ -325,28 +304,11 @@ void ArchiveReader::close() { m_array_dict->close(); m_timestamp_dict->close(); - m_table_reader.close(); + m_tables_file_reader.close(); m_table_metadata_file_reader.close(); - m_id_to_schema_metadata.clear(); + m_id_to_table_metadata.clear(); m_schema_ids.clear(); - m_cur_table_id = 0; - m_table_buffer.reset(); - m_table_buffer_size = 0ULL; } -std::shared_ptr ArchiveReader::read_table(size_t table_id, bool reuse_buffer) { - if (nullptr != m_table_buffer && m_cur_table_id == table_id) { - return m_table_buffer; - } - - if (false == reuse_buffer) { - m_table_buffer.reset(); - m_table_buffer_size = 0; - } - - m_table_reader.read_table(table_id, m_table_buffer, m_table_buffer_size); - m_cur_table_id = table_id; - return m_table_buffer; -} } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 02755592a..91fcc1a94 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -12,7 +12,6 @@ #include "DictionaryReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" -#include "TableReader.hpp" #include "TimestampDictionaryReader.hpp" #include "Utils.hpp" @@ -92,11 +91,8 @@ class ArchiveReader { * @param should_marshal_records * @return the schema reader */ - SchemaReader& read_schema_table( - int32_t schema_id, - bool should_extract_timestamp, - bool should_marshal_records - ); + SchemaReader& + read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records); /** * Loads all of the tables in the archive and returns SchemaReaders for them. @@ -175,14 +171,6 @@ class ArchiveReader { bool should_marshal_records ); - /** - * Reads a table with given ID from the table reader. If read_table is called multiple times in - * a row for the same table_id a cached buffer is returned. This function allows the caller to - * ask for the same buffer to be reused to read multiple different tables: this can save memory - * allocations, but can only be used when tables are read one at a time. - */ - std::shared_ptr read_table(size_t table_id, bool reuse_buffer); - bool m_is_open; std::string m_archive_id; std::shared_ptr m_var_dict; @@ -193,15 +181,13 @@ class ArchiveReader { std::shared_ptr m_schema_tree; std::shared_ptr m_schema_map; std::vector m_schema_ids; - std::map m_id_to_schema_metadata; + std::map m_id_to_table_metadata; - TableReader m_table_reader; + FileReader m_tables_file_reader; FileReader m_table_metadata_file_reader; + ZstdDecompressor m_tables_decompressor; ZstdDecompressor m_table_metadata_decompressor; SchemaReader m_schema_reader; - std::shared_ptr m_table_buffer{}; - size_t m_table_buffer_size{0ULL}; - size_t m_cur_table_id{0ULL}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index cc1fd78cc..c8cf08b22 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -78,8 +78,6 @@ set( SchemaTree.hpp SchemaWriter.cpp SchemaWriter.hpp - TableReader.cpp - TableReader.hpp TimestampDictionaryReader.cpp TimestampDictionaryReader.hpp TimestampDictionaryWriter.cpp diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 265e772d8..03edebf69 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -37,13 +37,17 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { } } -void SchemaReader::load( - std::shared_ptr table_buffer, - size_t offset, - size_t uncompressed_size -) { - m_table_buffer = table_buffer; - BufferViewReader buffer_reader{m_table_buffer.get() + offset, uncompressed_size}; +void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) { + if (uncompressed_size > m_table_buffer_size) { + m_table_buffer = std::make_unique(uncompressed_size); + m_table_buffer_size = uncompressed_size; + } + auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size); + if (ErrorCodeSuccess != error) { + throw OperationFailed(error, __FILENAME__, __LINE__); + } + + BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size}; for (auto& reader : m_columns) { reader->load(buffer_reader, m_num_messages); } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 300bc47c8..3639560f6 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -1,7 +1,6 @@ #ifndef CLP_S_SCHEMAREADER_HPP #define CLP_S_SCHEMAREADER_HPP -#include #include #include #include @@ -12,6 +11,7 @@ #include "FileReader.hpp" #include "JsonSerializer.hpp" #include "SchemaTree.hpp" +#include "ZstdDecompressor.hpp" namespace clp_s { class SchemaReader; @@ -47,10 +47,9 @@ class SchemaReader { : TraceableException(error_code, filename, line_number) {} }; - struct SchemaMetadata { - size_t table_id; - size_t table_offset; - size_t num_messages; + struct TableMetadata { + uint64_t num_messages; + size_t offset; size_t uncompressed_size; }; @@ -131,12 +130,11 @@ class SchemaReader { ); /** - * Loads the encoded messages from a shared buffer starting at a given offset - * @param table_buffer - * @param offset + * Loads the encoded messages + * @param decompressor * @param uncompressed_size */ - void load(std::shared_ptr table_buffer, size_t offset, size_t uncompressed_size); + void load(ZstdDecompressor& decompressor, size_t uncompressed_size); /** * Gets next message @@ -279,7 +277,8 @@ class SchemaReader { std::unordered_map m_column_map; std::vector m_columns; std::vector m_reordered_columns; - std::shared_ptr m_table_buffer; + std::unique_ptr m_table_buffer; + size_t m_table_buffer_size{0}; BaseColumnReader* m_timestamp_column; std::function m_get_timestamp; diff --git a/components/core/src/clp_s/TableReader.cpp b/components/core/src/clp_s/TableReader.cpp deleted file mode 100644 index 969824356..000000000 --- a/components/core/src/clp_s/TableReader.cpp +++ /dev/null @@ -1,108 +0,0 @@ -#include "TableReader.hpp" - -namespace clp_s { - -void TableReader::read_metadata(ZstdDecompressor& decompressor) { - switch (m_state) { - case TableReaderState::Uninitialized: - m_state = TableReaderState::MetadataRead; - break; - case TableReaderState::TablesOpened: - m_state = TableReaderState::TablesOpenedAndMetadataRead; - break; - default: - throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); - } - - size_t num_tables; - if (auto error = decompressor.try_read_numeric_value(num_tables); ErrorCodeSuccess != error) { - throw OperationFailed(error, __FILE__, __LINE__); - } - m_table_metadata.reserve(num_tables); - - for (size_t i = 0; i < num_tables; ++i) { - size_t file_offset; - size_t uncompressed_size; - - if (auto error = decompressor.try_read_numeric_value(file_offset); - ErrorCodeSuccess != error) - { - throw OperationFailed(error, __FILE__, __LINE__); - } - - if (auto error = decompressor.try_read_numeric_value(uncompressed_size); - ErrorCodeSuccess != error) - { - throw OperationFailed(error, __FILE__, __LINE__); - } - - m_table_metadata.emplace_back(file_offset, uncompressed_size); - } -} - -void TableReader::open_tables(std::string const& tables_file_path) { - switch (m_state) { - case TableReaderState::Uninitialized: - m_state = TableReaderState::TablesOpened; - break; - case TableReaderState::MetadataRead: - m_state = TableReaderState::TablesOpenedAndMetadataRead; - break; - default: - throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); - } - m_tables_reader.open(tables_file_path); -} - -void TableReader::close() { - switch (m_state) { - case TableReaderState::TablesOpened: - case TableReaderState::TablesOpenedAndMetadataRead: - case TableReaderState::ReadingTables: - break; - default: - throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); - } - m_tables_reader.close(); - m_previous_table_id = 0; - m_table_metadata.clear(); - m_state = TableReaderState::Uninitialized; -} - -void TableReader::read_table(size_t table_id, std::shared_ptr& buf, size_t& buf_size) { - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - if (table_id > m_table_metadata.size()) { - throw OperationFailed(ErrorCodeCorrupt, __FILE__, __LINE__); - } - - switch (m_state) { - case TableReaderState::TablesOpenedAndMetadataRead: - m_state = TableReaderState::ReadingTables; - break; - case TableReaderState::ReadingTables: - if (m_previous_table_id >= table_id) { - throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__); - } - break; - default: - throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); - } - m_previous_table_id = table_id; - - auto& [file_offset, uncompressed_size] = m_table_metadata[table_id]; - m_tables_reader.try_seek_from_begin(file_offset); - m_tables_decompressor.open(m_tables_reader, cDecompressorFileReadBufferCapacity); - if (buf_size < uncompressed_size) { - // make_shared is supposed to work here for c++20, but it seems like the compiler version - // we use doesn't support it, so we convert a unique_ptr to a shared_ptr instead. - buf = std::make_unique(uncompressed_size); - buf_size = uncompressed_size; - } - if (auto error = m_tables_decompressor.try_read_exact_length(buf.get(), uncompressed_size); - ErrorCodeSuccess != error) - { - throw OperationFailed(error, __FILE__, __LINE__); - } - m_tables_decompressor.close_for_reuse(); -} -} // namespace clp_s diff --git a/components/core/src/clp_s/TableReader.hpp b/components/core/src/clp_s/TableReader.hpp deleted file mode 100644 index 116d1b957..000000000 --- a/components/core/src/clp_s/TableReader.hpp +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef CLP_S_TABLEREADER_HPP -#define CLP_S_TABLEREADER_HPP - -#include -#include -#include -#include - -#include "FileReader.hpp" -#include "ZstdDecompressor.hpp" - -namespace clp_s { -/** - * TableReader ensures that the tables section of an archive is read safely. Any attempt to read the - * tables section without loading the tables metadata, and any attempt to read tables section out of - * order will throw. As well, any incorrect usage of this class (e.g. closing without opening) will - * throw. - */ -class TableReader { -public: - class OperationFailed : public TraceableException { - public: - // Constructors - OperationFailed(ErrorCode error_code, char const* const filename, int line_number) - : TraceableException(error_code, filename, line_number) {} - }; - - struct TableMetadata { - size_t file_offset; - size_t uncompressed_size; - }; - - /** - * Reads table metadata from the provided compression stream. Must be invoked before reading - * tables. - */ - void read_metadata(ZstdDecompressor& decompressor); - - /** - * Opens a file reader for the tables section. Must be invoked before reading tables. - */ - void open_tables(std::string const& tables_file_path); - - /** - * Closes the file reader for the tables section. - */ - void close(); - - /** - * Decompresses a table with a given table_id and returns it. This function must be called - * strictly in ascending table_id order. If this function is called twice for the same table or - * if a table with lower id is requested after a table with higher id then an error is thrown. - * - * Note: the buffer and buffer size are returned by reference. This is to support the use case - * where the caller wants to re-use the same buffer for multiple tables to avoid allocations - * when they already have a sufficiently large buffer. If no buffer is provided or the provided - * buffer is too small calling read_table will create a buffer exactly as large as the table - * being decompressed. - * - * @param table_id - * @param buf - * @param buf_size - * @return a shared_ptr to a buffer containing the requested table - */ - void read_table(size_t table_id, std::shared_ptr& buf, size_t& buf_size); - - size_t get_uncompressed_table_size(size_t table_id) const { - return m_table_metadata.at(table_id).uncompressed_size; - } - -private: - enum TableReaderState { - Uninitialized, - MetadataRead, - TablesOpened, - TablesOpenedAndMetadataRead, - ReadingTables - }; - - std::vector m_table_metadata; - FileReader m_tables_reader; - ZstdDecompressor m_tables_decompressor; - TableReaderState m_state{TableReaderState::Uninitialized}; - size_t m_previous_table_id{0ULL}; -}; - -} // namespace clp_s - -#endif // CLP_S_TABLEREADER_HPP diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index 4d36b4e29..b6a3b8fe0 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -84,7 +84,7 @@ bool Output::filter() { add_wildcard_columns_to_searched_columns(); - auto& reader = m_archive_reader->read_schema_table( + auto& reader = m_archive_reader->read_table( schema_id, m_output_handler->should_output_metadata(), m_should_marshal_records