diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 1bdc0baae..5362d32cc 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) { m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str); m_schema_map = ReaderUtils::read_schemas(archive_path_str); - m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile); m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile); + m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile); } void ArchiveReader::read_metadata() { @@ -38,6 +38,20 @@ void ArchiveReader::read_metadata() { cDecompressorFileReadBufferCapacity ); + m_stream_reader.read_metadata(m_table_metadata_decompressor); + + size_t num_separate_column_schemas; + if (auto error + = m_table_metadata_decompressor.try_read_numeric_value(num_separate_column_schemas); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILENAME__, __LINE__); + } + + if (0 != num_separate_column_schemas) { + throw OperationFailed(ErrorCode::ErrorCodeUnsupported, __FILENAME__, __LINE__); + } + size_t num_schemas; if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas); ErrorCodeSuccess != error) @@ -45,39 +59,65 @@ void ArchiveReader::read_metadata() { throw OperationFailed(error, __FILENAME__, __LINE__); } - for (size_t i = 0; i < num_schemas; i++) { + bool prev_metadata_initialized{false}; + SchemaReader::SchemaMetadata prev_metadata{}; + int32_t prev_schema_id{}; + for (size_t i = 0; i < num_schemas; ++i) { + uint64_t stream_id; + uint64_t stream_offset; int32_t schema_id; uint64_t num_messages; - size_t table_offset; - size_t uncompressed_size; - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_id); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_offset); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset); + if (stream_offset > m_stream_reader.get_uncompressed_stream_size(stream_id)) { + throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__); + } + + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size}; + if (prev_metadata_initialized) { + uint64_t uncompressed_size{0}; + if (stream_id != prev_metadata.stream_id) { + uncompressed_size + = m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id) + - prev_metadata.stream_offset; + } else { + uncompressed_size = stream_offset - prev_metadata.stream_offset; + } + prev_metadata.uncompressed_size = uncompressed_size; + m_id_to_schema_metadata[prev_schema_id] = prev_metadata; + } else { + prev_metadata_initialized = true; + } + prev_metadata = {stream_id, stream_offset, num_messages, 0}; + prev_schema_id = schema_id; m_schema_ids.push_back(schema_id); } + prev_metadata.uncompressed_size + = m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id) + - prev_metadata.stream_offset; + m_id_to_schema_metadata[prev_schema_id] = prev_metadata; m_table_metadata_decompressor.close(); } @@ -89,14 +129,12 @@ void ArchiveReader::read_dictionaries_and_metadata() { read_metadata(); } -SchemaReader& ArchiveReader::read_table( +SchemaReader& ArchiveReader::read_schema_table( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records ) { - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - - if (m_id_to_table_metadata.count(schema_id) == 0) { + if (m_id_to_schema_metadata.count(schema_id) == 0) { throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__); } @@ -107,30 +145,26 @@ SchemaReader& ArchiveReader::read_table( should_marshal_records ); - m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset); - m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); - m_schema_reader.load( - m_tables_decompressor, - m_id_to_table_metadata[schema_id].uncompressed_size - ); - m_tables_decompressor.close_for_reuse(); + auto& schema_metadata = m_id_to_schema_metadata[schema_id]; + auto stream_buffer = read_stream(schema_metadata.stream_id, true); + m_schema_reader + .load(stream_buffer, schema_metadata.stream_offset, schema_metadata.uncompressed_size); return m_schema_reader; } std::vector> ArchiveReader::read_all_tables() { - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - std::vector> readers; - readers.reserve(m_id_to_table_metadata.size()); - for (auto const& [id, table_metadata] : m_id_to_table_metadata) { + readers.reserve(m_id_to_schema_metadata.size()); + for (auto schema_id : m_schema_ids) { auto schema_reader = std::make_shared(); - initialize_schema_reader(*schema_reader, id, true, true); - - m_tables_file_reader.try_seek_from_begin(table_metadata.offset); - m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); - schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size); - m_tables_decompressor.close_for_reuse(); - + initialize_schema_reader(*schema_reader, schema_id, true, true); + auto& schema_metadata = m_id_to_schema_metadata[schema_id]; + auto stream_buffer = read_stream(schema_metadata.stream_id, false); + schema_reader->load( + stream_buffer, + schema_metadata.stream_offset, + schema_metadata.uncompressed_size + ); readers.push_back(std::move(schema_reader)); } return readers; @@ -238,7 +272,7 @@ void ArchiveReader::initialize_schema_reader( m_projection, schema_id, schema.get_ordered_schema_view(), - m_id_to_table_metadata[schema_id].num_messages, + m_id_to_schema_metadata[schema_id].num_messages, should_marshal_records ); auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids(); @@ -285,9 +319,8 @@ void ArchiveReader::initialize_schema_reader( void ArchiveReader::store(FileWriter& writer) { std::string message; - - for (auto& [id, table_metadata] : m_id_to_table_metadata) { - auto& schema_reader = read_table(id, false, true); + for (auto schema_id : m_schema_ids) { + auto& schema_reader = read_schema_table(schema_id, false, true); while (schema_reader.get_next_message(message)) { writer.write(message.c_str(), message.length()); } @@ -305,11 +338,28 @@ void ArchiveReader::close() { m_array_dict->close(); m_timestamp_dict->close(); - m_tables_file_reader.close(); + m_stream_reader.close(); m_table_metadata_file_reader.close(); - m_id_to_table_metadata.clear(); + m_id_to_schema_metadata.clear(); m_schema_ids.clear(); + m_cur_stream_id = 0; + m_stream_buffer.reset(); + m_stream_buffer_size = 0ULL; } +std::shared_ptr ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) { + if (nullptr != m_stream_buffer && m_cur_stream_id == stream_id) { + return m_stream_buffer; + } + + if (false == reuse_buffer) { + m_stream_buffer.reset(); + m_stream_buffer_size = 0; + } + + m_stream_reader.read_stream(stream_id, m_stream_buffer, m_stream_buffer_size); + m_cur_stream_id = stream_id; + return m_stream_buffer; +} } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 97966131f..41073ec84 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -10,6 +10,7 @@ #include #include "DictionaryReader.hpp" +#include "PackedStreamReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" #include "search/Projection.hpp" @@ -92,8 +93,11 @@ class ArchiveReader { * @param should_marshal_records * @return the schema reader */ - SchemaReader& - read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records); + SchemaReader& read_schema_table( + int32_t schema_id, + bool should_extract_timestamp, + bool should_marshal_records + ); /** * Loads all of the tables in the archive and returns SchemaReaders for them. @@ -176,6 +180,18 @@ class ArchiveReader { bool should_marshal_records ); + /** + * Reads a table with given ID from the packed stream reader. If read_stream is called multiple + * times in a row for the same stream_id a cached buffer is returned. This function allows the + * caller to ask for the same buffer to be reused to read multiple different tables: this can + * save memory allocations, but can only be used when tables are read one at a time. + * @param stream_id + * @param reuse_buffer when true the same buffer is reused across invocations, overwriting data + * returned previous calls to read_stream + * @return a buffer containing the decompressed stream identified by stream_id + */ + std::shared_ptr read_stream(size_t stream_id, bool reuse_buffer); + bool m_is_open; std::string m_archive_id; std::shared_ptr m_var_dict; @@ -186,16 +202,18 @@ class ArchiveReader { std::shared_ptr m_schema_tree; std::shared_ptr m_schema_map; std::vector m_schema_ids; - std::map m_id_to_table_metadata; + std::map m_id_to_schema_metadata; std::shared_ptr m_projection{ std::make_shared(search::ProjectionMode::ReturnAllColumns) }; - FileReader m_tables_file_reader; + PackedStreamReader m_stream_reader; FileReader m_table_metadata_file_reader; - ZstdDecompressor m_tables_decompressor; ZstdDecompressor m_table_metadata_decompressor; SchemaReader m_schema_reader; + std::shared_ptr m_stream_buffer{}; + size_t m_stream_buffer_size{0ULL}; + size_t m_cur_stream_id{0ULL}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index ba540a79d..369fd79d2 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -1,5 +1,7 @@ #include "ArchiveWriter.hpp" +#include + #include #include "archive_constants.hpp" @@ -11,6 +13,7 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) { m_id = boost::uuids::to_string(option.id); m_compression_level = option.compression_level; m_print_archive_stats = option.print_archive_stats; + m_min_table_size = option.min_table_size; auto archive_path = boost::filesystem::path(option.archives_dir) / m_id; boost::system::error_code boost_error_code; @@ -138,18 +141,103 @@ size_t ArchiveWriter::store_tables() { FileWriter::OpenMode::CreateForWriting ); m_table_metadata_compressor.open(m_table_metadata_file_writer, m_compression_level); - m_table_metadata_compressor.write_numeric_value(m_id_to_schema_writer.size()); - for (auto& i : m_id_to_schema_writer) { - m_table_metadata_compressor.write_numeric_value(i.first); - m_table_metadata_compressor.write_numeric_value(i.second->get_num_messages()); - m_table_metadata_compressor.write_numeric_value(m_tables_file_writer.get_pos()); - - m_tables_compressor.open(m_tables_file_writer, m_compression_level); - size_t uncompressed_size = i.second->store(m_tables_compressor); - m_tables_compressor.close(); - delete i.second; - - m_table_metadata_compressor.write_numeric_value(uncompressed_size); + + /** + * Packed stream metadata schema + * ------------------------------ + * Schema tables are packed into a series of compression streams. Each of those compression + * streams is identified by a 64 bit stream id. In the first half of the metadata we identify + * how many streams there are, and the offset into the file where each compression stream can + * be found. In the second half of the metadata we record how many schema tables there are, + * which compression stream they belong to, the offset into that compression stream where + * they can be found, and how many messages that schema table contains. + * + * Section 1: Compression Streams Metadata + * - Contains metadata about each compression stream. + * - Structure: + * - Number of packed streams: <64-bit integer> + * - For each stream: + * - Offset into the file: <64-bit integer> + * - Uncompressed size: <64-bit integer> + * - Number of separate column schemas: <64-bit integer> + * It is always 0 in the current implementation. + * - Undefined section for separate column schemas, reserved for future support. + * + * Section 2: Schema Tables Metadata + * - Contains metadata about schema tables associated with each compression stream. + * - Structure: + * - Number of schema tables: <64-bit integer> + * - For each schema table: + * - Stream ID: <64-bit integer> + * - Offset into the stream: <64-bit integer> + * - Schema ID: <32-bit integer> + * - Number of messages: <64-bit integer> + * + * We buffer the first half of the metadata in the "stream_metadata" vector, and the second half + * of the metadata in the "schema_metadata" vector as we compress the tables. The metadata is + * flushed once all of the schema tables have been compressed. + */ + using schema_map_it = decltype(m_id_to_schema_writer)::iterator; + std::vector schemas; + std::vector stream_metadata; + std::vector schema_metadata; + + schema_metadata.reserve(m_id_to_schema_writer.size()); + schemas.reserve(m_id_to_schema_writer.size()); + for (auto it = m_id_to_schema_writer.begin(); it != m_id_to_schema_writer.end(); ++it) { + schemas.push_back(it); + } + auto comp = [](schema_map_it const& lhs, schema_map_it const& rhs) -> bool { + return lhs->second->get_total_uncompressed_size() + > rhs->second->get_total_uncompressed_size(); + }; + std::sort(schemas.begin(), schemas.end(), comp); + + uint64_t current_stream_offset = 0; + uint64_t current_stream_id = 0; + uint64_t current_table_file_offset = 0; + m_tables_compressor.open(m_tables_file_writer, m_compression_level); + for (auto it : schemas) { + it->second->store(m_tables_compressor); + schema_metadata.emplace_back( + current_stream_id, + current_stream_offset, + it->first, + it->second->get_num_messages() + ); + current_stream_offset += it->second->get_total_uncompressed_size(); + delete it->second; + + if (current_stream_offset > m_min_table_size || schemas.size() == schema_metadata.size()) { + stream_metadata.emplace_back(current_table_file_offset, current_stream_offset); + m_tables_compressor.close(); + current_stream_offset = 0; + ++current_stream_id; + current_table_file_offset = m_tables_file_writer.get_pos(); + + if (schemas.size() != schema_metadata.size()) { + m_tables_compressor.open(m_tables_file_writer, m_compression_level); + } + } + } + + m_table_metadata_compressor.write_numeric_value(stream_metadata.size()); + for (auto& stream : stream_metadata) { + m_table_metadata_compressor.write_numeric_value(stream.file_offset); + m_table_metadata_compressor.write_numeric_value(stream.uncompressed_size); + } + + // The current implementation doesn't store large tables as separate columns, so this is always + // zero. + size_t const num_separate_column_schemas{0}; + m_table_metadata_compressor.write_numeric_value(num_separate_column_schemas); + + m_table_metadata_compressor.write_numeric_value(schema_metadata.size()); + for (auto& schema : schema_metadata) { + m_table_metadata_compressor.write_numeric_value(schema.stream_id); + m_table_metadata_compressor.write_numeric_value(schema.stream_offset); + m_table_metadata_compressor.write_numeric_value(schema.schema_id); + m_table_metadata_compressor.write_numeric_value(schema.num_messages); } m_table_metadata_compressor.close(); diff --git a/components/core/src/clp_s/ArchiveWriter.hpp b/components/core/src/clp_s/ArchiveWriter.hpp index 70eb5dc9b..7edfe4491 100644 --- a/components/core/src/clp_s/ArchiveWriter.hpp +++ b/components/core/src/clp_s/ArchiveWriter.hpp @@ -21,6 +21,7 @@ struct ArchiveWriterOption { std::string archives_dir; int compression_level; bool print_archive_stats; + size_t min_table_size; }; class ArchiveWriter { @@ -32,6 +33,33 @@ class ArchiveWriter { : TraceableException(error_code, filename, line_number) {} }; + struct StreamMetadata { + StreamMetadata(uint64_t file_offset, uint64_t uncompressed_size) + : file_offset(file_offset), + uncompressed_size(uncompressed_size) {} + + uint64_t file_offset{}; + uint64_t uncompressed_size{}; + }; + + struct SchemaMetadata { + SchemaMetadata( + uint64_t stream_id, + uint64_t stream_offset, + int32_t schema_id, + uint64_t num_messages + ) + : stream_id(stream_id), + stream_offset(stream_offset), + schema_id(schema_id), + num_messages(num_messages) {} + + uint64_t stream_id{}; + uint64_t stream_offset{}; + int32_t schema_id{}; + uint64_t num_messages{}; + }; + // Constructor explicit ArchiveWriter(std::shared_ptr metadata_db) : m_metadata_db(std::move(metadata_db)) {} @@ -159,6 +187,7 @@ class ArchiveWriter { std::shared_ptr m_metadata_db; int m_compression_level{}; bool m_print_archive_stats{}; + size_t m_min_table_size{}; SchemaMap m_schema_map; SchemaTree m_schema_tree; diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 625972de7..1656a5d59 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -65,6 +65,8 @@ set( JsonParser.cpp JsonParser.hpp JsonSerializer.hpp + PackedStreamReader.cpp + PackedStreamReader.hpp ParsedMessage.hpp ReaderUtils.cpp ReaderUtils.hpp diff --git a/components/core/src/clp_s/ColumnWriter.cpp b/components/core/src/clp_s/ColumnWriter.cpp index 183f17e57..77fa51f15 100644 --- a/components/core/src/clp_s/ColumnWriter.cpp +++ b/components/core/src/clp_s/ColumnWriter.cpp @@ -1,41 +1,37 @@ #include "ColumnWriter.hpp" namespace clp_s { -void Int64ColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = sizeof(int64_t); +size_t Int64ColumnWriter::add_value(ParsedMessage::variable_t& value) { m_values.push_back(std::get(value)); + return sizeof(int64_t); } -size_t Int64ColumnWriter::store(ZstdCompressor& compressor) { +void Int64ColumnWriter::store(ZstdCompressor& compressor) { size_t size = m_values.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_values.data()), size); - return size; } -void FloatColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = sizeof(double); +size_t FloatColumnWriter::add_value(ParsedMessage::variable_t& value) { m_values.push_back(std::get(value)); + return sizeof(double); } -size_t FloatColumnWriter::store(ZstdCompressor& compressor) { +void FloatColumnWriter::store(ZstdCompressor& compressor) { size_t size = m_values.size() * sizeof(double); compressor.write(reinterpret_cast(m_values.data()), size); - return size; } -void BooleanColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = sizeof(uint8_t); +size_t BooleanColumnWriter::add_value(ParsedMessage::variable_t& value) { m_values.push_back(std::get(value) ? 1 : 0); + return sizeof(uint8_t); } -size_t BooleanColumnWriter::store(ZstdCompressor& compressor) { +void BooleanColumnWriter::store(ZstdCompressor& compressor) { size_t size = m_values.size() * sizeof(uint8_t); compressor.write(reinterpret_cast(m_values.data()), size); - return size; } -void ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = sizeof(int64_t); +size_t ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value) { std::string string_var = std::get(value); uint64_t id; uint64_t offset = m_encoded_vars.size(); @@ -48,45 +44,43 @@ void ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& m_log_dict->add_entry(m_logtype_entry, id); auto encoded_id = encode_log_dict_id(id, offset); m_logtypes.push_back(encoded_id); - size += sizeof(int64_t) * (m_encoded_vars.size() - offset); + return sizeof(int64_t) + sizeof(int64_t) * (m_encoded_vars.size() - offset); } -size_t ClpStringColumnWriter::store(ZstdCompressor& compressor) { +void ClpStringColumnWriter::store(ZstdCompressor& compressor) { size_t logtypes_size = m_logtypes.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_logtypes.data()), logtypes_size); size_t encoded_vars_size = m_encoded_vars.size() * sizeof(int64_t); size_t num_encoded_vars = m_encoded_vars.size(); compressor.write_numeric_value(num_encoded_vars); compressor.write(reinterpret_cast(m_encoded_vars.data()), encoded_vars_size); - return logtypes_size + sizeof(num_encoded_vars) + encoded_vars_size; } -void VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = sizeof(int64_t); +size_t VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value) { std::string string_var = std::get(value); uint64_t id; m_var_dict->add_entry(string_var, id); m_variables.push_back(id); + return sizeof(int64_t); } -size_t VariableStringColumnWriter::store(ZstdCompressor& compressor) { +void VariableStringColumnWriter::store(ZstdCompressor& compressor) { size_t size = m_variables.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_variables.data()), size); - return size; } -void DateStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { - size = 2 * sizeof(int64_t); +size_t DateStringColumnWriter::add_value(ParsedMessage::variable_t& value) { auto encoded_timestamp = std::get>(value); m_timestamps.push_back(encoded_timestamp.second); m_timestamp_encodings.push_back(encoded_timestamp.first); + return 2 * sizeof(int64_t); + ; } -size_t DateStringColumnWriter::store(ZstdCompressor& compressor) { +void DateStringColumnWriter::store(ZstdCompressor& compressor) { size_t timestamps_size = m_timestamps.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_timestamps.data()), timestamps_size); size_t encodings_size = m_timestamp_encodings.size() * sizeof(int64_t); compressor.write(reinterpret_cast(m_timestamp_encodings.data()), encodings_size); - return timestamps_size + encodings_size; } } // namespace clp_s diff --git a/components/core/src/clp_s/ColumnWriter.hpp b/components/core/src/clp_s/ColumnWriter.hpp index ce458381e..7282cf7ea 100644 --- a/components/core/src/clp_s/ColumnWriter.hpp +++ b/components/core/src/clp_s/ColumnWriter.hpp @@ -27,16 +27,24 @@ class BaseColumnWriter { /** * Adds a value to the column * @param value - * @param size + * @return the size of the encoded data appended to this column in bytes */ - virtual void add_value(ParsedMessage::variable_t& value, size_t& size) = 0; + virtual size_t add_value(ParsedMessage::variable_t& value) = 0; /** * Stores the column to a compressed file. * @param compressor - * @return the in-memory uncompressed size of the data written to the compressor */ - virtual size_t store(ZstdCompressor& compressor) = 0; + virtual void store(ZstdCompressor& compressor) = 0; + + /** + * Returns the total size of the header data that will be written to the compressor. This header + * size plus the sum of sizes returned by add_value is equal to the total size of data that will + * be written to the compressor in bytes. + * + * @return the total size of header data that will be written to the compressor in bytes + */ + virtual size_t get_total_header_size() const { return 0; } protected: int32_t m_id; @@ -51,9 +59,9 @@ class Int64ColumnWriter : public BaseColumnWriter { ~Int64ColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; - size_t store(ZstdCompressor& compressor) override; + void store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -68,9 +76,9 @@ class FloatColumnWriter : public BaseColumnWriter { ~FloatColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; - size_t store(ZstdCompressor& compressor) override; + void store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -85,9 +93,9 @@ class BooleanColumnWriter : public BaseColumnWriter { ~BooleanColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; - size_t store(ZstdCompressor& compressor) override; + void store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -109,9 +117,11 @@ class ClpStringColumnWriter : public BaseColumnWriter { ~ClpStringColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; + + void store(ZstdCompressor& compressor) override; - size_t store(ZstdCompressor& compressor) override; + size_t get_total_header_size() const override { return sizeof(size_t); } /** * @param encoded_id @@ -163,9 +173,9 @@ class VariableStringColumnWriter : public BaseColumnWriter { ~VariableStringColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; - size_t store(ZstdCompressor& compressor) override; + void store(ZstdCompressor& compressor) override; private: std::shared_ptr m_var_dict; @@ -181,9 +191,9 @@ class DateStringColumnWriter : public BaseColumnWriter { ~DateStringColumnWriter() override = default; // Methods inherited from BaseColumnWriter - void add_value(ParsedMessage::variable_t& value, size_t& size) override; + size_t add_value(ParsedMessage::variable_t& value) override; - size_t store(ZstdCompressor& compressor) override; + void store(ZstdCompressor& compressor) override; private: std::vector m_timestamps; diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 553f17c39..cf69a066c 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -160,6 +160,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { default_value(m_target_encoded_size), "Target size (B) for the dictionaries and encoded messages before a new " "archive is created." + )( + "min-table-size", + po::value(&m_minimum_table_size)->value_name("MIN_TABLE_SIZE")-> + default_value(m_minimum_table_size), + "Minimum size (B) for a packed table before it gets compressed." )( "max-document-size", po::value(&m_max_document_size)->value_name("DOC_SIZE")-> diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 030ff8b99..798e42728 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -108,6 +108,8 @@ class CommandLineArguments { size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; } + size_t get_minimum_table_size() const { return m_minimum_table_size; } + std::vector const& get_projection_columns() const { return m_projection_columns; } private: @@ -175,6 +177,7 @@ class CommandLineArguments { bool m_structurize_arrays{false}; bool m_ordered_decompression{false}; size_t m_ordered_chunk_size{0}; + size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB // Metadata db variables std::optional m_metadata_db_config; diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index a68062958..7d4af1469 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -31,6 +31,7 @@ JsonParser::JsonParser(JsonParserOption const& option) m_archive_options.archives_dir = option.archives_dir; m_archive_options.compression_level = option.compression_level; m_archive_options.print_archive_stats = option.print_archive_stats; + m_archive_options.min_table_size = option.min_table_size; m_archive_options.id = m_generator(); m_archive_writer = std::make_unique(option.metadata_db); diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index 84aa27fef..af6b024ef 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -32,6 +32,7 @@ struct JsonParserOption { std::string archives_dir; size_t target_encoded_size; size_t max_document_size; + size_t min_table_size; int compression_level; bool print_archive_stats; bool structurize_arrays; diff --git a/components/core/src/clp_s/PackedStreamReader.cpp b/components/core/src/clp_s/PackedStreamReader.cpp new file mode 100644 index 000000000..44eb94e96 --- /dev/null +++ b/components/core/src/clp_s/PackedStreamReader.cpp @@ -0,0 +1,117 @@ +#include "PackedStreamReader.hpp" + +namespace clp_s { + +void PackedStreamReader::read_metadata(ZstdDecompressor& decompressor) { + switch (m_state) { + case PackedStreamReaderState::Uninitialized: + m_state = PackedStreamReaderState::MetadataRead; + break; + case PackedStreamReaderState::PackedStreamsOpened: + m_state = PackedStreamReaderState::PackedStreamsOpenedAndMetadataRead; + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + + size_t num_streams; + if (auto error = decompressor.try_read_numeric_value(num_streams); ErrorCodeSuccess != error) { + throw OperationFailed(error, __FILE__, __LINE__); + } + m_stream_metadata.reserve(num_streams); + + for (size_t i = 0; i < num_streams; ++i) { + size_t file_offset; + size_t uncompressed_size; + + if (auto error = decompressor.try_read_numeric_value(file_offset); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + + if (auto error = decompressor.try_read_numeric_value(uncompressed_size); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + + m_stream_metadata.emplace_back(file_offset, uncompressed_size); + } +} + +void PackedStreamReader::open_packed_streams(std::string const& tables_file_path) { + switch (m_state) { + case PackedStreamReaderState::Uninitialized: + m_state = PackedStreamReaderState::PackedStreamsOpened; + break; + case PackedStreamReaderState::MetadataRead: + m_state = PackedStreamReaderState::PackedStreamsOpenedAndMetadataRead; + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_packed_stream_reader.open(tables_file_path); +} + +void PackedStreamReader::close() { + switch (m_state) { + case PackedStreamReaderState::PackedStreamsOpened: + case PackedStreamReaderState::PackedStreamsOpenedAndMetadataRead: + case PackedStreamReaderState::ReadingPackedStreams: + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_packed_stream_reader.close(); + m_prev_stream_id = 0; + m_stream_metadata.clear(); + m_state = PackedStreamReaderState::Uninitialized; +} + +void PackedStreamReader::read_stream( + size_t stream_id, + std::shared_ptr& buf, + size_t& buf_size +) { + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + if (stream_id >= m_stream_metadata.size()) { + throw OperationFailed(ErrorCodeCorrupt, __FILE__, __LINE__); + } + + switch (m_state) { + case PackedStreamReaderState::PackedStreamsOpenedAndMetadataRead: + m_state = PackedStreamReaderState::ReadingPackedStreams; + break; + case PackedStreamReaderState::ReadingPackedStreams: + if (m_prev_stream_id >= stream_id) { + throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__); + } + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_prev_stream_id = stream_id; + + auto& [file_offset, uncompressed_size] = m_stream_metadata[stream_id]; + if (auto error = m_packed_stream_reader.try_seek_from_begin(file_offset); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + m_packed_stream_decompressor.open(m_packed_stream_reader, cDecompressorFileReadBufferCapacity); + if (buf_size < uncompressed_size) { + // make_shared is supposed to work here for c++20, but it seems like the compiler version + // we use doesn't support it, so we convert a unique_ptr to a shared_ptr instead. + buf = std::make_unique(uncompressed_size); + buf_size = uncompressed_size; + } + if (auto error + = m_packed_stream_decompressor.try_read_exact_length(buf.get(), uncompressed_size); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + m_packed_stream_decompressor.close_for_reuse(); +} +} // namespace clp_s diff --git a/components/core/src/clp_s/PackedStreamReader.hpp b/components/core/src/clp_s/PackedStreamReader.hpp new file mode 100644 index 000000000..d9f9af58f --- /dev/null +++ b/components/core/src/clp_s/PackedStreamReader.hpp @@ -0,0 +1,97 @@ +#ifndef CLP_S_PACKEDSTREAMREADER_HPP +#define CLP_S_PACKEDSTREAMREADER_HPP + +#include +#include +#include +#include + +#include "FileReader.hpp" +#include "ZstdDecompressor.hpp" + +namespace clp_s { +/** + * PackedStreamReader ensures that the tables section of an archive is read safely. Any attempt to + * read the tables section without loading the tables metadata, and any attempt to read tables + * section out of order will throw. As well, any incorrect usage of this class (e.g. closing without + * opening) will throw. + */ +class PackedStreamReader { +public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + struct PackedStreamMetadata { + PackedStreamMetadata(size_t offset, size_t size) + : file_offset(offset), + uncompressed_size(size) {} + + size_t file_offset; + size_t uncompressed_size; + }; + + /** + * Reads packed stream metadata from the provided compression stream. Must be invoked before + * reading packed streams. + * @param decompressor an open ZstdDecompressor pointing to the packed stream metadata + */ + void read_metadata(ZstdDecompressor& decompressor); + + /** + * Opens a file reader for the tables section. Must be invoked before reading packed streams. + * @param tables_file_path the path to the tables file for the archive being read + */ + void open_packed_streams(std::string const& tables_file_path); + + /** + * Closes the file reader for the tables section. + */ + void close(); + + /** + * Decompresses a stream with a given stream_id and returns it. This function must be called + * strictly in ascending stream_id order. If this function is called twice for the same stream + * or if a stream with lower id is requested after a stream with higher id then an error is + * thrown. + * + * Note: the buffer and buffer size are returned by reference. This is to support the use case + * where the caller wants to re-use the same buffer for multiple streams to avoid allocations + * when they already have a sufficiently large buffer. If no buffer is provided or the provided + * buffer is too small calling read_stream will create a buffer exactly as large as the stream + * being decompressed. + * + * @param stream_id + * @param buf a shared ptr to the buffer where the stream will be read. The buffer gets resized + * if it is too small to contain the requested stream. + * @param buf_size the size of the underlying buffer owned by buf -- passed and updated by + * reference + */ + void read_stream(size_t stream_id, std::shared_ptr& buf, size_t& buf_size); + + [[nodiscard]] size_t get_uncompressed_stream_size(size_t stream_id) const { + return m_stream_metadata.at(stream_id).uncompressed_size; + } + +private: + enum PackedStreamReaderState { + Uninitialized, + MetadataRead, + PackedStreamsOpened, + PackedStreamsOpenedAndMetadataRead, + ReadingPackedStreams + }; + + std::vector m_stream_metadata; + FileReader m_packed_stream_reader; + ZstdDecompressor m_packed_stream_decompressor; + PackedStreamReaderState m_state{PackedStreamReaderState::Uninitialized}; + size_t m_prev_stream_id{0ULL}; +}; + +} // namespace clp_s + +#endif // CLP_S_PACKEDSTREAMREADER_HPP diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index d2c7739da..35c1d0a6e 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -37,17 +37,13 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { } } -void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) { - if (uncompressed_size > m_table_buffer_size) { - m_table_buffer = std::make_unique(uncompressed_size); - m_table_buffer_size = uncompressed_size; - } - auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size); - if (ErrorCodeSuccess != error) { - throw OperationFailed(error, __FILENAME__, __LINE__); - } - - BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size}; +void SchemaReader::load( + std::shared_ptr stream_buffer, + size_t offset, + size_t uncompressed_size +) { + m_stream_buffer = stream_buffer; + BufferViewReader buffer_reader{m_stream_buffer.get() + offset, uncompressed_size}; for (auto& reader : m_columns) { reader->load(buffer_reader, m_num_messages); } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index d72b2c3b6..08651cc39 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_SCHEMAREADER_HPP #define CLP_S_SCHEMAREADER_HPP +#include #include #include #include @@ -48,10 +49,11 @@ class SchemaReader { : TraceableException(error_code, filename, line_number) {} }; - struct TableMetadata { + struct SchemaMetadata { + uint64_t stream_id; + uint64_t stream_offset; uint64_t num_messages; - size_t offset; - size_t uncompressed_size; + uint64_t uncompressed_size; }; // Constructor @@ -134,11 +136,12 @@ class SchemaReader { ); /** - * Loads the encoded messages - * @param decompressor + * Loads the encoded messages from a shared buffer starting at a given offset + * @param stream_buffer + * @param offset * @param uncompressed_size */ - void load(ZstdDecompressor& decompressor, size_t uncompressed_size); + void load(std::shared_ptr stream_buffer, size_t offset, size_t uncompressed_size); /** * Gets next message @@ -281,8 +284,7 @@ class SchemaReader { std::unordered_map m_column_map; std::vector m_columns; std::vector m_reordered_columns; - std::unique_ptr m_table_buffer; - size_t m_table_buffer_size{0}; + std::shared_ptr m_stream_buffer; BaseColumnReader* m_timestamp_column; std::function m_get_timestamp; diff --git a/components/core/src/clp_s/SchemaWriter.cpp b/components/core/src/clp_s/SchemaWriter.cpp index b6de15f2f..ae3f16f15 100644 --- a/components/core/src/clp_s/SchemaWriter.cpp +++ b/components/core/src/clp_s/SchemaWriter.cpp @@ -4,35 +4,32 @@ namespace clp_s { void SchemaWriter::append_column(BaseColumnWriter* column_writer) { + m_total_uncompressed_size += column_writer->get_total_header_size(); m_columns.push_back(column_writer); } size_t SchemaWriter::append_message(ParsedMessage& message) { - int count = 0; - size_t size, total_size; - size = total_size = 0; + int count{}; + size_t total_size{}; for (auto& i : message.get_content()) { - m_columns[count]->add_value(i.second, size); - total_size += size; - count++; + total_size += m_columns[count]->add_value(i.second); + ++count; } for (auto& i : message.get_unordered_content()) { - m_columns[count]->add_value(i, size); - total_size += size; + total_size += m_columns[count]->add_value(i); ++count; } m_num_messages++; + m_total_uncompressed_size += total_size; return total_size; } -size_t SchemaWriter::store(ZstdCompressor& compressor) { - size_t total_size = 0; +void SchemaWriter::store(ZstdCompressor& compressor) { for (auto& writer : m_columns) { - total_size += writer->store(compressor); + writer->store(compressor); } - return total_size; } SchemaWriter::~SchemaWriter() { diff --git a/components/core/src/clp_s/SchemaWriter.hpp b/components/core/src/clp_s/SchemaWriter.hpp index 41b28600b..4f204d949 100644 --- a/components/core/src/clp_s/SchemaWriter.hpp +++ b/components/core/src/clp_s/SchemaWriter.hpp @@ -40,20 +40,19 @@ class SchemaWriter { /** * Stores the columns to disk. * @param compressor - * @return the uncompressed in-memory size of the table */ - [[nodiscard]] size_t store(ZstdCompressor& compressor); + void store(ZstdCompressor& compressor); + + uint64_t get_num_messages() const { return m_num_messages; } /** - * Closes the schema writer. - * @return the compressed size of the schema table in bytes + * @return the uncompressed in-memory size of the data that will be written to the compressor */ - [[nodiscard]] size_t close(); - - uint64_t get_num_messages() const { return m_num_messages; } + size_t get_total_uncompressed_size() const { return m_total_uncompressed_size; } private: uint64_t m_num_messages; + size_t m_total_uncompressed_size{}; std::vector m_columns; std::vector m_unordered_columns; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 8e37ca769..5f4384a1c 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -91,6 +91,7 @@ bool compress(CommandLineArguments const& command_line_arguments) { option.archives_dir = archives_dir.string(); option.target_encoded_size = command_line_arguments.get_target_encoded_size(); option.max_document_size = command_line_arguments.get_max_document_size(); + option.min_table_size = command_line_arguments.get_minimum_table_size(); option.compression_level = command_line_arguments.get_compression_level(); option.timestamp_key = command_line_arguments.get_timestamp_key(); option.print_archive_stats = command_line_arguments.print_archive_stats(); diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index b6a3b8fe0..4d36b4e29 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -84,7 +84,7 @@ bool Output::filter() { add_wildcard_columns_to_searched_columns(); - auto& reader = m_archive_reader->read_table( + auto& reader = m_archive_reader->read_schema_table( schema_id, m_output_handler->should_output_metadata(), m_should_marshal_records