diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index baecce359..424d4cd0f 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -459,5 +459,5 @@ target_link_libraries(unitTest ZStd::ZStd ) target_compile_features(unitTest - PRIVATE cxx_std_17 + PRIVATE cxx_std_20 ) diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index b19712f7b..c83b6e0f4 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -119,7 +119,7 @@ set( ) add_executable(clg ${CLG_SOURCES}) -target_compile_features(clg PRIVATE cxx_std_17) +target_compile_features(clg PRIVATE cxx_std_20) target_include_directories(clg PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clg PRIVATE diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index f5f6f99b7..880808a44 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -138,7 +138,7 @@ set( ) add_executable(clo ${CLO_SOURCES} ${REDUCER_SOURCES}) -target_compile_features(clo PRIVATE cxx_std_17) +target_compile_features(clo PRIVATE cxx_std_20) target_include_directories(clo PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clo PRIVATE diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index dc1a9038a..937c62ac4 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -153,7 +153,7 @@ set( ) add_executable(clp ${CLP_SOURCES}) -target_compile_features(clp PRIVATE cxx_std_17) +target_compile_features(clp PRIVATE cxx_std_20) target_include_directories(clp PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(clp PRIVATE diff --git a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt index b880d3c63..6dc5334bf 100644 --- a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt +++ b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt @@ -37,7 +37,7 @@ set( ) add_executable(make-dictionaries-readable ${MAKE_DICTIONARIES_READABLE_SOURCES}) -target_compile_features(make-dictionaries-readable PRIVATE cxx_std_17) +target_compile_features(make-dictionaries-readable PRIVATE cxx_std_20) target_include_directories(make-dictionaries-readable PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(make-dictionaries-readable PRIVATE diff --git a/components/core/src/clp/string_utils/CMakeLists.txt b/components/core/src/clp/string_utils/CMakeLists.txt index bbfde63ea..3759938e5 100644 --- a/components/core/src/clp/string_utils/CMakeLists.txt +++ b/components/core/src/clp/string_utils/CMakeLists.txt @@ -9,4 +9,4 @@ add_library( ) add_library(clp::string_utils ALIAS string_utils) target_include_directories(string_utils PUBLIC ../) -target_compile_features(string_utils PRIVATE cxx_std_17) +target_compile_features(string_utils PRIVATE cxx_std_20) diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index b692f7b72..d4a193349 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -41,6 +41,7 @@ void ArchiveReader::read_metadata() { int32_t schema_id; uint64_t num_messages; size_t table_offset; + size_t uncompressed_size; if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); ErrorCodeSuccess != error) @@ -60,7 +61,13 @@ void ArchiveReader::read_metadata() { throw OperationFailed(error, __FILENAME__, __LINE__); } - m_id_to_table_metadata[schema_id] = {num_messages, table_offset}; + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILENAME__, __LINE__); + } + + m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size}; m_schema_ids.push_back(schema_id); } m_table_metadata_decompressor.close(); @@ -74,7 +81,7 @@ void ArchiveReader::read_dictionaries_and_metadata() { read_metadata(); } -std::unique_ptr ArchiveReader::read_table( +SchemaReader& ArchiveReader::read_table( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records @@ -85,93 +92,156 @@ std::unique_ptr ArchiveReader::read_table( throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__); } - auto schema_reader + auto& schema_reader = create_schema_reader(schema_id, should_extract_timestamp, should_marshal_records); m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset); m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); - schema_reader->load(m_tables_decompressor); - m_tables_decompressor.close(); + schema_reader.load(m_tables_decompressor, m_id_to_table_metadata[schema_id].uncompressed_size); + m_tables_decompressor.close_for_reuse(); return schema_reader; } -BaseColumnReader* -ArchiveReader::append_reader_column(std::unique_ptr& reader, int32_t column_id) { +BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int32_t column_id) { BaseColumnReader* column_reader = nullptr; - auto node = m_schema_tree->get_node(column_id); - std::string key_name = node->get_key_name(); - switch (node->get_type()) { - case NodeType::INTEGER: - column_reader = new Int64ColumnReader(key_name, column_id); - break; - case NodeType::FLOAT: - column_reader = new FloatColumnReader(key_name, column_id); + auto const& node = m_schema_tree->get_node(column_id); + switch (node.get_type()) { + case NodeType::Integer: + column_reader = new Int64ColumnReader(column_id); break; - case NodeType::CLPSTRING: - column_reader = new ClpStringColumnReader(key_name, column_id, m_var_dict, m_log_dict); + case NodeType::Float: + column_reader = new FloatColumnReader(column_id); break; - case NodeType::VARSTRING: - column_reader = new VariableStringColumnReader(key_name, column_id, m_var_dict); + case NodeType::ClpString: + column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict); break; - case NodeType::BOOLEAN: - column_reader = new BooleanColumnReader(key_name, column_id); + case NodeType::VarString: + column_reader = new VariableStringColumnReader(column_id, m_var_dict); break; - case NodeType::ARRAY: - column_reader = new ClpStringColumnReader( - key_name, - column_id, - m_var_dict, - m_array_dict, - true - ); + case NodeType::Boolean: + column_reader = new BooleanColumnReader(column_id); break; - case NodeType::DATESTRING: - column_reader = new DateStringColumnReader(key_name, column_id, m_timestamp_dict); + case NodeType::UnstructuredArray: + column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true); break; - case NodeType::OBJECT: - case NodeType::NULLVALUE: - reader->append_column(column_id); + case NodeType::DateString: + column_reader = new DateStringColumnReader(column_id, m_timestamp_dict); break; - case NodeType::UNKNOWN: + // No need to push columns without associated object readers into the SchemaReader. + case NodeType::Object: + case NodeType::NullValue: + case NodeType::Unknown: break; } if (column_reader) { - reader->append_column(column_reader); + reader.append_column(column_reader); } return column_reader; } -std::unique_ptr ArchiveReader::create_schema_reader( +void ArchiveReader::append_unordered_reader_columns( + SchemaReader& reader, + NodeType unordered_object_type, + std::span schema_ids, + bool should_marshal_records +) { + int32_t mst_subtree_root_node_id = INT32_MAX; + size_t object_begin_pos = reader.get_column_size(); + for (int32_t column_id : schema_ids) { + if (Schema::schema_entry_is_unordered_object(column_id)) { + continue; + } + BaseColumnReader* column_reader = nullptr; + auto const& node = m_schema_tree->get_node(column_id); + if (INT32_MAX == mst_subtree_root_node_id) { + mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree( + -1, + column_id, + unordered_object_type + ); + } + switch (node.get_type()) { + case NodeType::Integer: + column_reader = new Int64ColumnReader(column_id); + break; + case NodeType::Float: + column_reader = new FloatColumnReader(column_id); + break; + case NodeType::ClpString: + column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict); + break; + case NodeType::VarString: + column_reader = new VariableStringColumnReader(column_id, m_var_dict); + break; + case NodeType::Boolean: + column_reader = new BooleanColumnReader(column_id); + break; + // UnstructuredArray and DateString currently aren't supported as part of any unordered + // object, so we disregard them here + case NodeType::UnstructuredArray: + case NodeType::DateString: + // No need to push columns without associated object readers into the SchemaReader. + case NodeType::Object: + case NodeType::NullValue: + case NodeType::Unknown: + break; + } + + if (column_reader) { + reader.append_unordered_column(column_reader); + } + } + + if (should_marshal_records) { + reader.mark_unordered_object(object_begin_pos, mst_subtree_root_node_id, schema_ids); + } +} + +SchemaReader& ArchiveReader::create_schema_reader( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records ) { - auto reader = std::make_unique( + auto& schema = (*m_schema_map)[schema_id]; + m_schema_reader.reset( m_schema_tree, schema_id, + schema.get_ordered_schema_view(), m_id_to_table_metadata[schema_id].num_messages, should_marshal_records ); auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids(); - for (int32_t column_id : (*m_schema_map)[reader->get_schema_id()]) { - BaseColumnReader* column_reader = append_reader_column(reader, column_id); + for (size_t i = 0; i < schema.size(); ++i) { + int32_t column_id = schema[i]; + if (Schema::schema_entry_is_unordered_object(column_id)) { + size_t length = Schema::get_unordered_object_length(column_id); + append_unordered_reader_columns( + m_schema_reader, + Schema::get_unordered_object_type(column_id), + schema.get_view(i + 1, length), + should_marshal_records + ); + i += length; + continue; + } + BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id); if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0) { - reader->mark_column_as_timestamp(column_reader); + m_schema_reader.mark_column_as_timestamp(column_reader); } } - return reader; + return m_schema_reader; } void ArchiveReader::store(FileWriter& writer) { std::string message; for (auto& [id, table_metadata] : m_id_to_table_metadata) { - auto schema_reader = read_table(id, false, true); - while (schema_reader->get_next_message(message)) { + auto& schema_reader = read_table(id, false, true); + while (schema_reader.get_next_message(message)) { writer.write(message.c_str(), message.length()); } } diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index f26fc7d4f..6ce881e91 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -11,6 +12,7 @@ #include "ReaderUtils.hpp" #include "SchemaReader.hpp" #include "TimestampDictionaryReader.hpp" +#include "Utils.hpp" namespace clp_s { class ArchiveReader { @@ -87,7 +89,7 @@ class ArchiveReader { * @param should_marshal_records * @return the schema reader */ - std::unique_ptr + SchemaReader& read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records); std::shared_ptr get_variable_dictionary() { return m_var_dict; } @@ -127,8 +129,9 @@ class ArchiveReader { * @param schema_id * @param should_extract_timestamp * @param should_marshal_records + * @return a reference to the newly created schema reader initialized with the given parameters */ - std::unique_ptr create_schema_reader( + SchemaReader& create_schema_reader( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records @@ -138,9 +141,24 @@ class ArchiveReader { * Appends a column to the schema reader. * @param reader * @param column_id + * @return a pointer to the newly appended column reader or nullptr if no column reader was + * created */ - BaseColumnReader* - append_reader_column(std::unique_ptr& reader, int32_t column_id); + BaseColumnReader* append_reader_column(SchemaReader& reader, int32_t column_id); + + /** + * Appends columns for the entire schema of an unordered object. + * @param reader + * @param unordered_object_type + * @param schema_ids + * @param should_marshal_records + */ + void append_unordered_reader_columns( + SchemaReader& reader, + NodeType unordered_object_type, + std::span schema_ids, + bool should_marshal_records + ); bool m_is_open; std::string m_archive_path; @@ -159,6 +177,7 @@ class ArchiveReader { FileReader m_table_metadata_file_reader; ZstdDecompressor m_tables_decompressor; ZstdDecompressor m_table_metadata_decompressor; + SchemaReader m_schema_reader; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index 7d4a4c9e2..11ac278ca 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -92,32 +92,35 @@ size_t ArchiveWriter::get_data_size() { void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& schema) { for (int32_t id : schema) { - auto node = m_schema_tree.get_node(id); - switch (node->get_type()) { - case NodeType::INTEGER: + if (Schema::schema_entry_is_unordered_object(id)) { + continue; + } + auto const& node = m_schema_tree.get_node(id); + switch (node.get_type()) { + case NodeType::Integer: writer->append_column(new Int64ColumnWriter(id)); break; - case NodeType::FLOAT: + case NodeType::Float: writer->append_column(new FloatColumnWriter(id)); break; - case NodeType::CLPSTRING: + case NodeType::ClpString: writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict)); break; - case NodeType::VARSTRING: + case NodeType::VarString: writer->append_column(new VariableStringColumnWriter(id, m_var_dict)); break; - case NodeType::BOOLEAN: + case NodeType::Boolean: writer->append_column(new BooleanColumnWriter(id)); break; - case NodeType::ARRAY: + case NodeType::UnstructuredArray: writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_array_dict)); break; - case NodeType::DATESTRING: + case NodeType::DateString: writer->append_column(new DateStringColumnWriter(id)); break; - case NodeType::OBJECT: - case NodeType::NULLVALUE: - case NodeType::UNKNOWN: + case NodeType::Object: + case NodeType::NullValue: + case NodeType::Unknown: break; } } @@ -141,9 +144,11 @@ size_t ArchiveWriter::store_tables() { m_table_metadata_compressor.write_numeric_value(m_tables_file_writer.get_pos()); m_tables_compressor.open(m_tables_file_writer, m_compression_level); - i.second->store(m_tables_compressor); + size_t uncompressed_size = i.second->store(m_tables_compressor); m_tables_compressor.close(); delete i.second; + + m_table_metadata_compressor.write_numeric_value(uncompressed_size); } m_table_metadata_compressor.close(); diff --git a/components/core/src/clp_s/BufferViewReader.hpp b/components/core/src/clp_s/BufferViewReader.hpp new file mode 100644 index 000000000..ca6e675ea --- /dev/null +++ b/components/core/src/clp_s/BufferViewReader.hpp @@ -0,0 +1,58 @@ +#ifndef CLP_S_BUFFER_VIEW_READER_HPP +#define CLP_S_BUFFER_VIEW_READER_HPP + +#include "TraceableException.hpp" +#include "Utils.hpp" + +namespace clp_s { +/** + * BufferViewReader is a utility class designed to let several readers safely share views + * into a shared buffer. Readers can consume from the buffer either by consuming spans and scalar + * values from the buffer. Any attempt to consume past the end of the buffer will throw. + * @throws OperationFailed + */ +class BufferViewReader { +public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + explicit BufferViewReader(char* buffer, size_t size) + : m_buffer(buffer), + m_remaining_size(size) {} + + template + T read_value() { + if (m_remaining_size < sizeof(T)) { + throw OperationFailed(ErrorCodeOutOfBounds, __FILENAME__, __LINE__); + } + T tmp; + memcpy(&tmp, m_buffer, sizeof(T)); + m_buffer += sizeof(T); + m_remaining_size -= sizeof(T); + return tmp; + } + + template + UnalignedMemSpan read_unaligned_span(size_t length) { + size_t span_length_in_bytes = sizeof(T) * length; + if (m_remaining_size < span_length_in_bytes) { + throw OperationFailed(ErrorCodeOutOfBounds, __FILENAME__, __LINE__); + } + UnalignedMemSpan tmp{m_buffer, length}; + m_buffer += span_length_in_bytes; + m_remaining_size -= span_length_in_bytes; + return tmp; + } + + size_t get_remaining_size() { return m_remaining_size; } + +private: + char* m_buffer{}; + size_t m_remaining_size{}; +}; +} // namespace clp_s +#endif // CLP_S_BUFFER_VIEW_READER_HPP diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 5b537701d..c8cf08b22 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -38,6 +38,7 @@ set( ArchiveReader.hpp ArchiveWriter.cpp ArchiveWriter.hpp + BufferViewReader.hpp ColumnReader.cpp ColumnReader.hpp ColumnWriter.cpp @@ -185,7 +186,7 @@ add_executable( ${CLP_S_SEARCH_SOURCES} ${REDUCER_SOURCES} ) -target_compile_features(clp-s PRIVATE cxx_std_17) +target_compile_features(clp-s PRIVATE cxx_std_20) target_link_libraries( clp-s PRIVATE diff --git a/components/core/src/clp_s/ColumnReader.cpp b/components/core/src/clp_s/ColumnReader.cpp index 3b3f12391..ded7a906d 100644 --- a/components/core/src/clp_s/ColumnReader.cpp +++ b/components/core/src/clp_s/ColumnReader.cpp @@ -1,17 +1,12 @@ #include "ColumnReader.hpp" +#include "BufferViewReader.hpp" #include "ColumnWriter.hpp" -#include "Utils.hpp" #include "VariableDecoder.hpp" namespace clp_s { -void Int64ColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - m_values = std::make_unique(num_messages); - - decompressor.try_read_exact_length( - reinterpret_cast(m_values.get()), - num_messages * sizeof(int64_t) - ); +void Int64ColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_values = reader.read_unaligned_span(num_messages); } std::variant Int64ColumnReader::extract_value( @@ -20,6 +15,10 @@ std::variant Int64ColumnReader::extract_v return m_values[cur_message]; } +void FloatColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_values = reader.read_unaligned_span(num_messages); +} + void Int64ColumnReader::extract_string_value_into_buffer( uint64_t cur_message, std::string& buffer @@ -27,21 +26,16 @@ void Int64ColumnReader::extract_string_value_into_buffer( buffer.append(std::to_string(m_values[cur_message])); } -void FloatColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - m_values = std::make_unique(num_messages); - - decompressor.try_read_exact_length( - reinterpret_cast(m_values.get()), - num_messages * sizeof(double) - ); -} - std::variant FloatColumnReader::extract_value( uint64_t cur_message ) { return m_values[cur_message]; } +void BooleanColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_values = reader.read_unaligned_span(num_messages); +} + void FloatColumnReader::extract_string_value_into_buffer( uint64_t cur_message, std::string& buffer @@ -49,21 +43,18 @@ void FloatColumnReader::extract_string_value_into_buffer( buffer.append(std::to_string(m_values[cur_message])); } -void BooleanColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - m_values = std::make_unique(num_messages); - - decompressor.try_read_exact_length( - reinterpret_cast(m_values.get()), - num_messages * sizeof(uint8_t) - ); -} - std::variant BooleanColumnReader::extract_value( uint64_t cur_message ) { return m_values[cur_message]; } +void ClpStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_logtypes = reader.read_unaligned_span(num_messages); + size_t encoded_vars_length = reader.read_value(); + m_encoded_vars = reader.read_unaligned_span(encoded_vars_length); +} + void BooleanColumnReader::extract_string_value_into_buffer( uint64_t cur_message, std::string& buffer @@ -71,27 +62,6 @@ void BooleanColumnReader::extract_string_value_into_buffer( buffer.append(0 == m_values[cur_message] ? "false" : "true"); } -void ClpStringColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - size_t encoded_vars_length; - - m_logtypes = std::make_unique(num_messages); - decompressor.try_read_exact_length( - reinterpret_cast(m_logtypes.get()), - num_messages * sizeof(int64_t) - ); - - auto error_code = decompressor.try_read_numeric_value(encoded_vars_length); - if (ErrorCodeSuccess != error_code) { - throw OperationFailed(error_code, __FILENAME__, __LINE__); - } - - m_encoded_vars = std::make_unique(encoded_vars_length); - decompressor.try_read_exact_length( - reinterpret_cast(m_encoded_vars.get()), - encoded_vars_length * sizeof(int64_t) - ); -} - std::variant ClpStringColumnReader::extract_value( uint64_t cur_message ) { @@ -113,7 +83,7 @@ void ClpStringColumnReader::extract_string_value_into_buffer( } int64_t encoded_vars_offset = ClpStringColumnWriter::get_encoded_offset(value); - Span encoded_vars(&m_encoded_vars[encoded_vars_offset], entry.get_num_vars()); + auto encoded_vars = m_encoded_vars.sub_span(encoded_vars_offset, entry.get_num_vars()); VariableDecoder::decode_variables_into_message(entry, *m_var_dict, encoded_vars, buffer); } @@ -123,9 +93,9 @@ int64_t ClpStringColumnReader::get_encoded_id(uint64_t cur_message) { return ClpStringColumnWriter::get_encoded_log_dict_id(value); } -Span ClpStringColumnReader::get_encoded_vars(uint64_t cur_message) { +UnalignedMemSpan ClpStringColumnReader::get_encoded_vars(uint64_t cur_message) { auto value = m_logtypes[cur_message]; - int64_t logtype_id = ClpStringColumnWriter::get_encoded_log_dict_id(value); + auto logtype_id = ClpStringColumnWriter::get_encoded_log_dict_id(value); auto& entry = m_log_dict->get_entry(logtype_id); // It should be initialized before because we are searching on this field @@ -135,15 +105,11 @@ Span ClpStringColumnReader::get_encoded_vars(uint64_t cur_message) { int64_t encoded_vars_offset = ClpStringColumnWriter::get_encoded_offset(value); - return {&m_encoded_vars[encoded_vars_offset], entry.get_num_vars()}; + return m_encoded_vars.sub_span(encoded_vars_offset, entry.get_num_vars()); } -void VariableStringColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - m_variables = std::make_unique(num_messages); - decompressor.try_read_exact_length( - reinterpret_cast(m_variables.get()), - num_messages * sizeof(int64_t) - ); +void VariableStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_variables = reader.read_unaligned_span(num_messages); } std::variant VariableStringColumnReader::extract_value( @@ -163,18 +129,9 @@ int64_t VariableStringColumnReader::get_variable_id(uint64_t cur_message) { return m_variables[cur_message]; } -void DateStringColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) { - m_timestamps = std::make_unique(num_messages); - m_timestamp_encodings = std::make_unique(num_messages); - - decompressor.try_read_exact_length( - reinterpret_cast(m_timestamps.get()), - num_messages * sizeof(int64_t) - ); - decompressor.try_read_exact_length( - reinterpret_cast(m_timestamp_encodings.get()), - num_messages * sizeof(int64_t) - ); +void DateStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) { + m_timestamps = reader.read_unaligned_span(num_messages); + m_timestamp_encodings = reader.read_unaligned_span(num_messages); } std::variant DateStringColumnReader::extract_value( diff --git a/components/core/src/clp_s/ColumnReader.hpp b/components/core/src/clp_s/ColumnReader.hpp index c0002d68a..69995e6de 100644 --- a/components/core/src/clp_s/ColumnReader.hpp +++ b/components/core/src/clp_s/ColumnReader.hpp @@ -4,6 +4,7 @@ #include #include +#include "BufferViewReader.hpp" #include "DictionaryReader.hpp" #include "SchemaTree.hpp" #include "TimestampDictionaryReader.hpp" @@ -21,23 +22,21 @@ class BaseColumnReader { }; // Constructor - BaseColumnReader(std::string name, int32_t id) : m_name(std::move(name)), m_id(id) {} + BaseColumnReader(int32_t id) : m_id(id) {} // Destructor virtual ~BaseColumnReader() = default; /** - * Reads the column from the disk - * @param decompressor + * Reads the column from a shared buffer. + * @param buffer * @param num_messages */ - virtual void load(ZstdDecompressor& decompressor, uint64_t num_messages) = 0; - - std::string get_name() const { return m_name; } + virtual void load(BufferViewReader& reader, uint64_t num_messages) = 0; int32_t get_id() const { return m_id; } - virtual NodeType get_type() { return NodeType::UNKNOWN; } + virtual NodeType get_type() { return NodeType::Unknown; } /** * Extracts a value of the column @@ -55,23 +54,21 @@ class BaseColumnReader { virtual void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) = 0; private: - std::string m_name; int32_t m_id; }; class Int64ColumnReader : public BaseColumnReader { public: // Constructor - explicit Int64ColumnReader(std::string name, int32_t id) - : BaseColumnReader(std::move(name), id) {} + explicit Int64ColumnReader(int32_t id) : BaseColumnReader(id) {} // Destructor ~Int64ColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::INTEGER; } + NodeType get_type() override { return NodeType::Integer; } std::variant extract_value(uint64_t cur_message ) override; @@ -79,22 +76,21 @@ class Int64ColumnReader : public BaseColumnReader { void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override; private: - std::unique_ptr m_values; + UnalignedMemSpan m_values; }; class FloatColumnReader : public BaseColumnReader { public: // Constructor - explicit FloatColumnReader(std::string name, int32_t id) - : BaseColumnReader(std::move(name), id) {} + explicit FloatColumnReader(int32_t id) : BaseColumnReader(id) {} // Destructor ~FloatColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::FLOAT; } + NodeType get_type() override { return NodeType::Float; } std::variant extract_value(uint64_t cur_message ) override; @@ -102,22 +98,21 @@ class FloatColumnReader : public BaseColumnReader { void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override; private: - std::unique_ptr m_values; + UnalignedMemSpan m_values; }; class BooleanColumnReader : public BaseColumnReader { public: // Constructor - explicit BooleanColumnReader(std::string name, int32_t id) - : BaseColumnReader(std::move(name), id) {} + explicit BooleanColumnReader(int32_t id) : BaseColumnReader(id) {} // Destructor ~BooleanColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::BOOLEAN; } + NodeType get_type() override { return NodeType::Boolean; } std::variant extract_value(uint64_t cur_message ) override; @@ -125,20 +120,19 @@ class BooleanColumnReader : public BaseColumnReader { void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override; private: - std::unique_ptr m_values; + UnalignedMemSpan m_values; }; class ClpStringColumnReader : public BaseColumnReader { public: // Constructor ClpStringColumnReader( - std::string const& name, int32_t id, std::shared_ptr var_dict, std::shared_ptr log_dict, bool is_array = false ) - : BaseColumnReader(name, id), + : BaseColumnReader(id), m_var_dict(std::move(var_dict)), m_log_dict(std::move(log_dict)), m_is_array(is_array) /*, encoded_vars_index_(0)*/ {} @@ -147,9 +141,11 @@ class ClpStringColumnReader : public BaseColumnReader { ~ClpStringColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return m_is_array ? NodeType::ARRAY : NodeType::CLPSTRING; } + NodeType get_type() override { + return m_is_array ? NodeType::UnstructuredArray : NodeType::ClpString; + } std::variant extract_value(uint64_t cur_message ) override; @@ -168,15 +164,14 @@ class ClpStringColumnReader : public BaseColumnReader { * @param cur_message * @return Encoded variables in a span */ - Span get_encoded_vars(uint64_t cur_message); + UnalignedMemSpan get_encoded_vars(uint64_t cur_message); private: std::shared_ptr m_var_dict; std::shared_ptr m_log_dict; - std::unique_ptr m_logtypes; - std::unique_ptr m_encoded_vars; - // size_t encoded_vars_index_; + UnalignedMemSpan m_logtypes; + UnalignedMemSpan m_encoded_vars; bool m_is_array; }; @@ -184,21 +179,17 @@ class ClpStringColumnReader : public BaseColumnReader { class VariableStringColumnReader : public BaseColumnReader { public: // Constructor - VariableStringColumnReader( - std::string const& name, - int32_t id, - std::shared_ptr var_dict - ) - : BaseColumnReader(name, id), + VariableStringColumnReader(int32_t id, std::shared_ptr var_dict) + : BaseColumnReader(id), m_var_dict(std::move(var_dict)) {} // Destructor ~VariableStringColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::VARSTRING; } + NodeType get_type() override { return NodeType::VarString; } std::variant extract_value(uint64_t cur_message ) override; @@ -215,27 +206,23 @@ class VariableStringColumnReader : public BaseColumnReader { private: std::shared_ptr m_var_dict; - std::unique_ptr m_variables; + UnalignedMemSpan m_variables; }; class DateStringColumnReader : public BaseColumnReader { public: // Constructor - DateStringColumnReader( - std::string const& name, - int32_t id, - std::shared_ptr timestamp_dict - ) - : BaseColumnReader(name, id), + DateStringColumnReader(int32_t id, std::shared_ptr timestamp_dict) + : BaseColumnReader(id), m_timestamp_dict(std::move(timestamp_dict)) {} // Destructor ~DateStringColumnReader() override = default; // Methods inherited from BaseColumnReader - void load(ZstdDecompressor& decompressor, uint64_t num_messages) override; + void load(BufferViewReader& reader, uint64_t num_messages) override; - NodeType get_type() override { return NodeType::DATESTRING; } + NodeType get_type() override { return NodeType::DateString; } std::variant extract_value(uint64_t cur_message ) override; @@ -251,8 +238,8 @@ class DateStringColumnReader : public BaseColumnReader { private: std::shared_ptr m_timestamp_dict; - std::unique_ptr m_timestamps; - std::unique_ptr m_timestamp_encodings; + UnalignedMemSpan m_timestamps; + UnalignedMemSpan m_timestamp_encodings; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ColumnWriter.cpp b/components/core/src/clp_s/ColumnWriter.cpp index 23e548a3c..183f17e57 100644 --- a/components/core/src/clp_s/ColumnWriter.cpp +++ b/components/core/src/clp_s/ColumnWriter.cpp @@ -6,11 +6,10 @@ void Int64ColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size m_values.push_back(std::get(value)); } -void Int64ColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_values.data()), - m_values.size() * sizeof(int64_t) - ); +size_t Int64ColumnWriter::store(ZstdCompressor& compressor) { + size_t size = m_values.size() * sizeof(int64_t); + compressor.write(reinterpret_cast(m_values.data()), size); + return size; } void FloatColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { @@ -18,11 +17,10 @@ void FloatColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size m_values.push_back(std::get(value)); } -void FloatColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_values.data()), - m_values.size() * sizeof(double) - ); +size_t FloatColumnWriter::store(ZstdCompressor& compressor) { + size_t size = m_values.size() * sizeof(double); + compressor.write(reinterpret_cast(m_values.data()), size); + return size; } void BooleanColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { @@ -30,11 +28,10 @@ void BooleanColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& si m_values.push_back(std::get(value) ? 1 : 0); } -void BooleanColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_values.data()), - m_values.size() * sizeof(uint8_t) - ); +size_t BooleanColumnWriter::store(ZstdCompressor& compressor) { + size_t size = m_values.size() * sizeof(uint8_t); + compressor.write(reinterpret_cast(m_values.data()), size); + return size; } void ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { @@ -54,16 +51,14 @@ void ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size += sizeof(int64_t) * (m_encoded_vars.size() - offset); } -void ClpStringColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_logtypes.data()), - m_logtypes.size() * sizeof(int64_t) - ); - compressor.write_numeric_value(m_encoded_vars.size()); - compressor.write( - reinterpret_cast(m_encoded_vars.data()), - m_encoded_vars.size() * sizeof(int64_t) - ); +size_t ClpStringColumnWriter::store(ZstdCompressor& compressor) { + size_t logtypes_size = m_logtypes.size() * sizeof(int64_t); + compressor.write(reinterpret_cast(m_logtypes.data()), logtypes_size); + size_t encoded_vars_size = m_encoded_vars.size() * sizeof(int64_t); + size_t num_encoded_vars = m_encoded_vars.size(); + compressor.write_numeric_value(num_encoded_vars); + compressor.write(reinterpret_cast(m_encoded_vars.data()), encoded_vars_size); + return logtypes_size + sizeof(num_encoded_vars) + encoded_vars_size; } void VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { @@ -74,11 +69,10 @@ void VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value, siz m_variables.push_back(id); } -void VariableStringColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_variables.data()), - m_variables.size() * sizeof(int64_t) - ); +size_t VariableStringColumnWriter::store(ZstdCompressor& compressor) { + size_t size = m_variables.size() * sizeof(int64_t); + compressor.write(reinterpret_cast(m_variables.data()), size); + return size; } void DateStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) { @@ -88,14 +82,11 @@ void DateStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& m_timestamp_encodings.push_back(encoded_timestamp.first); } -void DateStringColumnWriter::store(ZstdCompressor& compressor) { - compressor.write( - reinterpret_cast(m_timestamps.data()), - m_timestamps.size() * sizeof(int64_t) - ); - compressor.write( - reinterpret_cast(m_timestamp_encodings.data()), - m_timestamp_encodings.size() * sizeof(int64_t) - ); +size_t DateStringColumnWriter::store(ZstdCompressor& compressor) { + size_t timestamps_size = m_timestamps.size() * sizeof(int64_t); + compressor.write(reinterpret_cast(m_timestamps.data()), timestamps_size); + size_t encodings_size = m_timestamp_encodings.size() * sizeof(int64_t); + compressor.write(reinterpret_cast(m_timestamp_encodings.data()), encodings_size); + return timestamps_size + encodings_size; } } // namespace clp_s diff --git a/components/core/src/clp_s/ColumnWriter.hpp b/components/core/src/clp_s/ColumnWriter.hpp index 5546bf282..ce458381e 100644 --- a/components/core/src/clp_s/ColumnWriter.hpp +++ b/components/core/src/clp_s/ColumnWriter.hpp @@ -32,10 +32,11 @@ class BaseColumnWriter { virtual void add_value(ParsedMessage::variable_t& value, size_t& size) = 0; /** - * Stores the column to a compressed file + * Stores the column to a compressed file. * @param compressor + * @return the in-memory uncompressed size of the data written to the compressor */ - virtual void store(ZstdCompressor& compressor) = 0; + virtual size_t store(ZstdCompressor& compressor) = 0; protected: int32_t m_id; @@ -52,7 +53,7 @@ class Int64ColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -69,7 +70,7 @@ class FloatColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -86,7 +87,7 @@ class BooleanColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; private: std::vector m_values; @@ -110,7 +111,7 @@ class ClpStringColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; /** * @param encoded_id @@ -164,7 +165,7 @@ class VariableStringColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; private: std::shared_ptr m_var_dict; @@ -182,7 +183,7 @@ class DateStringColumnWriter : public BaseColumnWriter { // Methods inherited from BaseColumnWriter void add_value(ParsedMessage::variable_t& value, size_t& size) override; - void store(ZstdCompressor& compressor) override; + size_t store(ZstdCompressor& compressor) override; private: std::vector m_timestamps; diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index 6922c83d8..8bc942864 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -80,7 +80,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s switch (line.type()) { case ondemand::json_type::object: { node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::OBJECT, cur_key); + ->add_node(node_id_stack.top(), NodeType::Object, cur_key); object_stack.push(std::move(line.get_object())); auto objref = object_stack.top(); auto it = ondemand::object_iterator(objref.begin()); @@ -96,7 +96,11 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s } case ondemand::json_type::array: { std::string value = std::string(std::string_view(simdjson::to_json_string(line))); - node_id = m_archive_writer->add_node(node_id_stack.top(), NodeType::ARRAY, cur_key); + node_id = m_archive_writer->add_node( + node_id_stack.top(), + NodeType::UnstructuredArray, + cur_key + ); m_current_parsed_message.add_value(node_id, value); m_current_schema.insert_ordered(node_id); break; @@ -107,13 +111,13 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s if (false == number_value.is_double()) { // FIXME: should have separate integer and unsigned // integer types to handle values greater than max int64 - type = NodeType::INTEGER; + type = NodeType::Integer; } else { - type = NodeType::FLOAT; + type = NodeType::Float; } node_id = m_archive_writer->add_node(node_id_stack.top(), type, cur_key); - if (type == NodeType::INTEGER) { + if (type == NodeType::Integer) { int64_t i64_value; if (number_value.is_uint64()) { i64_value = static_cast(number_value.get_uint64()); @@ -147,7 +151,7 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s if (matches_timestamp) { node_id = m_archive_writer->add_node( node_id_stack.top(), - NodeType::DATESTRING, + NodeType::DateString, cur_key ); uint64_t encoding_id{0}; @@ -161,11 +165,11 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s matches_timestamp = may_match_timestamp = can_match_timestamp = false; } else if (value.find(' ') != std::string::npos) { node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::CLPSTRING, cur_key); + ->add_node(node_id_stack.top(), NodeType::ClpString, cur_key); m_current_parsed_message.add_value(node_id, value); } else { node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::VARSTRING, cur_key); + ->add_node(node_id_stack.top(), NodeType::VarString, cur_key); m_current_parsed_message.add_value(node_id, value); } @@ -175,15 +179,14 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s case ondemand::json_type::boolean: { bool value = line.get_bool(); node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::BOOLEAN, cur_key); - + ->add_node(node_id_stack.top(), NodeType::Boolean, cur_key); m_current_parsed_message.add_value(node_id, value); m_current_schema.insert_ordered(node_id); break; } case ondemand::json_type::null: { node_id = m_archive_writer - ->add_node(node_id_stack.top(), NodeType::NULLVALUE, cur_key); + ->add_node(node_id_stack.top(), NodeType::NullValue, cur_key); m_current_schema.insert_ordered(node_id); break; } @@ -250,7 +253,7 @@ bool JsonParser::parse() { m_archive_writer->close(); return false; } - parse_line(ref.value(), -1, "root"); + parse_line(ref.value(), -1, ""); m_num_messages++; int32_t current_schema_id = m_archive_writer->add_schema(m_current_schema); diff --git a/components/core/src/clp_s/JsonSerializer.hpp b/components/core/src/clp_s/JsonSerializer.hpp index e4a18eeb5..13a122922 100644 --- a/components/core/src/clp_s/JsonSerializer.hpp +++ b/components/core/src/clp_s/JsonSerializer.hpp @@ -17,22 +17,39 @@ class JsonSerializer { AddStringField, AddArrayField, AddNullField, + AddIntValue, + AddFloatValue, + AddBoolValue, + AddStringValue, + AddNullValue }; static int64_t const cReservedLength = 4096; - explicit JsonSerializer(int64_t reserved_length = cReservedLength) : m_special_keys_index(0) { + explicit JsonSerializer(int64_t reserved_length = cReservedLength) { m_json_string.reserve(cReservedLength); } std::string& get_serialized_string() { return m_json_string; } + /** + * Resets the JsonSerializer for the next record. + */ void reset() { m_json_string.clear(); m_op_list_index = 0; m_special_keys_index = 0; } + /** + * Clears the contents of the JsonSerializer to make room for a new set of operations. + */ + void clear() { + reset(); + m_op_list.clear(); + m_special_keys.clear(); + } + void add_op(Op op) { m_op_list.push_back(op); } std::vector& get_op_list() { return m_op_list; } @@ -92,8 +109,8 @@ class JsonSerializer { std::vector m_op_list; std::vector m_special_keys; - size_t m_op_list_index; - size_t m_special_keys_index; + size_t m_op_list_index{0}; + size_t m_special_keys_index{0}; }; #endif // CLP_S_JSONSERIALIZER_HPP diff --git a/components/core/src/clp_s/ParsedMessage.hpp b/components/core/src/clp_s/ParsedMessage.hpp index 4715441f1..c843e2b7b 100644 --- a/components/core/src/clp_s/ParsedMessage.hpp +++ b/components/core/src/clp_s/ParsedMessage.hpp @@ -24,42 +24,35 @@ class ParsedMessage { void set_id(int32_t schema_id) { m_schema_id = schema_id; } /** - * Adds an int64_t value to the message for a given MST node ID. + * Adds a value to the message for a given MST node ID. + * @tparam T * @param node_id * @param value */ - inline void add_value(int32_t node_id, int64_t value) { m_message.emplace(node_id, value); } - - /** - * Adds a double value to the message for a given MST node ID. - * @param node_id - * @param value - */ - inline void add_value(int32_t node_id, double value) { m_message.emplace(node_id, value); } - - /** - * Adds a string value to the message for a given MST node ID. - * @param node_id - * @param value - */ - inline void add_value(int32_t node_id, std::string const& value) { + template + inline void add_value(int32_t node_id, T const& value) { m_message.emplace(node_id, value); } /** - * Adds a boolean value to the message for a given MST node ID. + * Adds a timestamp value and its encoding to the message for a given MST node ID. * @param node_id + * @param encoding_id * @param value */ - inline void add_value(int32_t node_id, bool value) { m_message.emplace(node_id, value); } + inline void add_value(int32_t node_id, uint64_t encoding_id, epochtime_t value) { + m_message.emplace(node_id, std::make_pair(encoding_id, value)); + } /** - * Adds a timestamp value and its encoding to the message for a given MST node ID. - * @param node_id + * Adds a value to the unordered region of the message. The order in which unordered values are + * added to the message must match the order in which the corresponding MST node IDs are added + * to the unordered region of the schema. * @param value */ - inline void add_value(int32_t node_id, uint64_t encoding_id, epochtime_t value) { - m_message.emplace(node_id, std::make_pair(encoding_id, value)); + template + inline void add_unordered_value(T const& value) { + m_unordered_message.emplace_back(value); } /** @@ -68,6 +61,7 @@ class ParsedMessage { void clear() { m_schema_id = -1; m_message.clear(); + m_unordered_message.clear(); } /** @@ -75,9 +69,15 @@ class ParsedMessage { */ std::map& get_content() { return m_message; } + /** + * @return the unordered content of the message + */ + std::vector& get_unordered_content() { return m_unordered_message; } + private: int32_t m_schema_id; std::map m_message; + std::vector m_unordered_message; }; } // namespace clp_s diff --git a/components/core/src/clp_s/ReaderUtils.cpp b/components/core/src/clp_s/ReaderUtils.cpp index c72425683..11583a60d 100644 --- a/components/core/src/clp_s/ReaderUtils.cpp +++ b/components/core/src/clp_s/ReaderUtils.cpp @@ -109,23 +109,31 @@ std::shared_ptr ReaderUtils::read_schemas(std::string co throw OperationFailed(error_code, __FILENAME__, __LINE__); } - size_t schema_node_size; + uint32_t schema_node_size; error_code = schema_id_decompressor.try_read_numeric_value(schema_node_size); if (ErrorCodeSuccess != error_code) { throw OperationFailed(error_code, __FILENAME__, __LINE__); } + uint32_t num_ordered_nodes; + error_code = schema_id_decompressor.try_read_numeric_value(num_ordered_nodes); + if (ErrorCodeSuccess != error_code) { + throw OperationFailed(error_code, __FILENAME__, __LINE__); + } + auto& schema = schemas[schema_id]; - for (size_t j = 0; j < schema_node_size; j++) { - int32_t node_id; - error_code = schema_id_decompressor.try_read_numeric_value(node_id); - if (ErrorCodeSuccess != error_code) { - throw OperationFailed(error_code, __FILENAME__, __LINE__); - } - - // Maintain schema ordering defined at compression time - schema.insert_unordered(node_id); + if (0 == schema_node_size) { + continue; + } + schema.resize(schema_node_size); + error_code = schema_id_decompressor.try_read_exact_length( + reinterpret_cast(schema.begin().base()), + sizeof(int32_t) * schema_node_size + ); + if (ErrorCodeSuccess != error_code) { + throw OperationFailed(error_code, __FILENAME__, __LINE__); } + schema.set_num_ordered(num_ordered_nodes); } schema_id_decompressor.close(); diff --git a/components/core/src/clp_s/Schema.hpp b/components/core/src/clp_s/Schema.hpp index ec4759af6..b16cca098 100644 --- a/components/core/src/clp_s/Schema.hpp +++ b/components/core/src/clp_s/Schema.hpp @@ -3,8 +3,13 @@ #include #include +#include #include +#include "SchemaTree.hpp" +#include "TraceableException.hpp" +#include "Utils.hpp" + namespace clp_s { /** * Class representing a schema made up of MST nodes. @@ -12,12 +17,19 @@ namespace clp_s { * Internally, the schema is represented by a vector where the first m_num_ordered entries are * ordered by MST node ID, and the following entries are allowed to have arbitrary order. * - * In the current implementation of clp-s, MST node IDs in a schema must be unique, so the caller - * is responsible for not inserting duplicate MST nodes into a schema. Future versions of clp-s will - * likely relax this requirement. + * In the current implementation of clp-s, MST node IDs must be unique in the ordered region of a + * schema, but can be repeated in the unordered region. The caller is responsible for not inserting + * duplicate MST nodes into the ordered region of a schema. */ class Schema { public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + /** * Inserts a node into the ordered region of the schema. */ @@ -42,6 +54,18 @@ class Schema { m_num_ordered = 0; } + /** + * Sets the number of ordered nodes present in the schema. This method is used during + * decompression to help initialize this object. + * @param num_ordered + */ + void set_num_ordered(size_t num_ordered) { m_num_ordered = num_ordered; } + + /** + * @return the number of ordered elements in the underlying schema + */ + [[nodiscard]] size_t get_num_ordered() const { return m_num_ordered; } + /** * @return the number of elements in the underlying schema */ @@ -77,6 +101,36 @@ class Schema { */ [[nodiscard]] auto cend() const { return m_schema.cend(); } + /** + * @return the nth value in the underlying schema + */ + int32_t operator[](size_t i) const { return m_schema[i]; } + + /** + * @return a view into the ordered region of the underlying schema + */ + [[nodiscard]] std::span get_ordered_schema_view() { + return std::span{m_schema.data(), m_num_ordered}; + } + + /** + * @param i + * @param size + * @return a view into the requested region of the schema + */ + [[nodiscard]] std::span get_view(size_t i, size_t size) { + if (i + size > m_schema.size()) { + throw OperationFailed(ErrorCodeOutOfBounds, __FILENAME__, __LINE__); + } + return std::span{m_schema.data() + i, size}; + } + + /** + * Resizes the internal schema vector to match the given length. + * @param size + */ + void resize(size_t size) { m_schema.resize(size); } + /** * Less than comparison operator so that Schema can act as a key for SchemaMap * @return true if this schema is less than the schema on the right hand side @@ -91,7 +145,68 @@ class Schema { */ bool operator==(Schema const& rhs) const { return m_schema == rhs.m_schema; } + /** + * Starts an unordered object of a given NodeType. + * + * Unordered objects must be closed by calling the `end_unordered_object` method with the start + * position returned by this method. + * @param object_type + * @return the start position of the unordered object + */ + [[nodiscard]] size_t start_unordered_object(NodeType object_type) { + insert_unordered(encode_node_type_as_schema_entry(object_type)); + return m_schema.size(); + } + + /** + * Ends an unordered object which was started by calling `start_unordered_object`. + * @param start_position + */ + void end_unordered_object(size_t start_position) { + m_schema[start_position - 1] |= static_cast(m_schema.size() - start_position); + } + + /** + * Encodes a NodeType as a schema entry to delimit an unordered object using bithacks. + * @param node_type + * @return The NodeType encoded as a schema entry + */ + static int32_t encode_node_type_as_schema_entry(NodeType node_type) { + return static_cast(node_type) << cEncodedTypeOffset; + } + + /** + * Checks if a schema entry is the delimeter for an unordered object. + * @param schema_entry + * @return true if the schema_entry is the delimiter for an unordered object, false otherwise + */ + static bool schema_entry_is_unordered_object(int32_t schema_entry) { + return 0 != (schema_entry & cEncodedTypeBitmask); + } + + /** + * Extracts the NodeType from an unordered object delimiter. + * @param schema_entry + * @return The extracted NodeType + */ + static NodeType get_unordered_object_type(int32_t schema_entry) { + return static_cast(static_cast(schema_entry) >> cEncodedTypeOffset); + } + + /** + * Extracts the unordered object length from an unordered object delimiter. + * @param schema_entry + * @return The extracted object length + */ + static int32_t get_unordered_object_length(int32_t schema_entry) { + return schema_entry & cEncodedTypeLengthBitmask; + } + private: + static constexpr size_t cEncodedTypeOffset = (sizeof(int32_t) - 1) * 8; + static constexpr int32_t cEncodedTypeBitmask = 0xFF00'0000; + static constexpr int32_t cEncodedTypeLengthBitmask = ~cEncodedTypeBitmask; + std::vector m_schema; size_t m_num_ordered{0}; }; diff --git a/components/core/src/clp_s/SchemaMap.cpp b/components/core/src/clp_s/SchemaMap.cpp index d7830475b..fa5acd2ef 100644 --- a/components/core/src/clp_s/SchemaMap.cpp +++ b/components/core/src/clp_s/SchemaMap.cpp @@ -28,7 +28,8 @@ size_t SchemaMap::store(std::string const& archives_dir, int compression_level) for (auto const& schema_mapping : m_schema_map) { auto const& schema = schema_mapping.first; schema_map_compressor.write_numeric_value(schema_mapping.second); - schema_map_compressor.write_numeric_value(schema.size()); + schema_map_compressor.write_numeric_value(static_cast(schema.size())); + schema_map_compressor.write_numeric_value(static_cast(schema.get_num_ordered())); for (int32_t mst_node_id : schema) { schema_map_compressor.write_numeric_value(mst_node_id); } diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index b786f2287..b12f4b651 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -1,28 +1,33 @@ #include "SchemaReader.hpp" +#include + +#include "BufferViewReader.hpp" +#include "Schema.hpp" + namespace clp_s { void SchemaReader::append_column(BaseColumnReader* column_reader) { m_column_map[column_reader->get_id()] = column_reader; m_columns.push_back(column_reader); - // The local schema tree is only necessary for generating the JSON template to marshal records. - if (m_should_marshal_records) { - generate_local_tree(column_reader->get_id()); - } +} + +void SchemaReader::append_unordered_column(BaseColumnReader* column_reader) { + m_columns.push_back(column_reader); } void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { m_timestamp_column = column_reader; - if (m_timestamp_column->get_type() == NodeType::DATESTRING) { + if (m_timestamp_column->get_type() == NodeType::DateString) { m_get_timestamp = [this]() { return static_cast(m_timestamp_column) ->get_encoded_time(m_cur_message); }; - } else if (m_timestamp_column->get_type() == NodeType::INTEGER) { + } else if (m_timestamp_column->get_type() == NodeType::Integer) { m_get_timestamp = [this]() { return std::get(static_cast(m_timestamp_column) ->extract_value(m_cur_message)); }; - } else if (m_timestamp_column->get_type() == NodeType::FLOAT) { + } else if (m_timestamp_column->get_type() == NodeType::Float) { m_get_timestamp = [this]() { return static_cast( std::get(static_cast(m_timestamp_column) @@ -32,20 +37,22 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { } } -void SchemaReader::append_column(int32_t id) { - // The local schema tree is only necessary for generating the JSON template to marshal records. - if (m_should_marshal_records) { - generate_local_tree(id); +void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) { + if (uncompressed_size > m_table_buffer_size) { + m_table_buffer = std::make_unique(uncompressed_size); + m_table_buffer_size = uncompressed_size; + } + auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size); + if (ErrorCodeSuccess != error) { + throw OperationFailed(error, __FILENAME__, __LINE__); } -} -void SchemaReader::load(ZstdDecompressor& decompressor) { + BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size}; for (auto& reader : m_columns) { - reader->load(decompressor, m_num_messages); + reader->load(buffer_reader, m_num_messages); } - - if (m_should_marshal_records) { - generate_json_template(0); + if (buffer_reader.get_remaining_size() > 0) { + throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__); } } @@ -67,7 +74,15 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddIntField: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_key(column->get_name()); + auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + m_json_serializer.append_key(name); + m_json_serializer.append_value( + std::to_string(std::get(column->extract_value(m_cur_message))) + ); + break; + } + case JsonSerializer::Op::AddIntValue: { + column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( std::to_string(std::get(column->extract_value(m_cur_message))) ); @@ -75,7 +90,15 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddFloatField: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_key(column->get_name()); + auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + m_json_serializer.append_key(name); + m_json_serializer.append_value( + std::to_string(std::get(column->extract_value(m_cur_message))) + ); + break; + } + case JsonSerializer::Op::AddFloatValue: { + column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( std::to_string(std::get(column->extract_value(m_cur_message))) ); @@ -83,7 +106,16 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddBoolField: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_key(column->get_name()); + auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + m_json_serializer.append_key(name); + m_json_serializer.append_value( + std::get(column->extract_value(m_cur_message)) != 0 ? "true" + : "false" + ); + break; + } + case JsonSerializer::Op::AddBoolValue: { + column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value( std::get(column->extract_value(m_cur_message)) != 0 ? "true" : "false" @@ -92,13 +124,21 @@ void SchemaReader::generate_json_string() { } case JsonSerializer::Op::AddStringField: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_key(column->get_name()); + auto const& name = m_global_schema_tree->get_node(column->get_id()).get_key_name(); + m_json_serializer.append_key(name); + m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message); + break; + } + case JsonSerializer::Op::AddStringValue: { + column = m_reordered_columns[column_id_index++]; m_json_serializer.append_value_from_column_with_quotes(column, m_cur_message); break; } case JsonSerializer::Op::AddArrayField: { column = m_reordered_columns[column_id_index++]; - m_json_serializer.append_key(column->get_name()); + m_json_serializer.append_key( + m_global_schema_tree->get_node(column->get_id()).get_key_name() + ); m_json_serializer.append_value_from_column(column, m_cur_message); break; } @@ -107,6 +147,10 @@ void SchemaReader::generate_json_string() { m_json_serializer.append_value("null"); break; } + case JsonSerializer::Op::AddNullValue: { + m_json_serializer.append_value("null"); + break; + } } } @@ -118,6 +162,9 @@ bool SchemaReader::get_next_message(std::string& message) { return false; } + if (false == m_serializer_initialized) { + initialize_serializer(); + } generate_json_string(); message = m_json_serializer.get_serialized_string(); @@ -132,12 +179,15 @@ bool SchemaReader::get_next_message(std::string& message) { bool SchemaReader::get_next_message(std::string& message, FilterClass* filter) { while (m_cur_message < m_num_messages) { - if (false == filter->filter(m_cur_message, m_extracted_values)) { + if (false == filter->filter(m_cur_message)) { m_cur_message++; continue; } if (m_should_marshal_records) { + if (false == m_serializer_initialized) { + initialize_serializer(); + } generate_json_string(); message = m_json_serializer.get_serialized_string(); @@ -161,12 +211,15 @@ bool SchemaReader::get_next_message_with_timestamp( // TODO: If we already get max_num_results messages, we can skip messages // with the timestamp less than the smallest timestamp in the priority queue while (m_cur_message < m_num_messages) { - if (false == filter->filter(m_cur_message, m_extracted_values)) { + if (false == filter->filter(m_cur_message)) { m_cur_message++; continue; } if (m_should_marshal_records) { + if (false == m_serializer_initialized) { + initialize_serializer(); + } generate_json_string(); message = m_json_serializer.get_serialized_string(); @@ -185,76 +238,126 @@ bool SchemaReader::get_next_message_with_timestamp( } void SchemaReader::initialize_filter(FilterClass* filter) { - filter->init(this, m_schema_id, m_column_map); + filter->init(this, m_schema_id, m_columns); } void SchemaReader::generate_local_tree(int32_t global_id) { - auto node = m_global_schema_tree->get_node(global_id); - int32_t parent_id = node->get_parent_id(); + std::stack global_id_stack; + global_id_stack.emplace(global_id); + do { + auto const& node = m_global_schema_tree->get_node(global_id_stack.top()); + int32_t parent_id = node.get_parent_id(); + + auto it = m_global_id_to_local_id.find(parent_id); + if (-1 != parent_id && it == m_global_id_to_local_id.end()) { + global_id_stack.emplace(parent_id); + continue; + } + + int32_t local_id = m_local_schema_tree.add_node( + parent_id == -1 ? -1 : m_global_id_to_local_id[parent_id], + node.get_type(), + node.get_key_name() + ); + + m_global_id_to_local_id[global_id_stack.top()] = local_id; + m_local_id_to_global_id[local_id] = global_id_stack.top(); + global_id_stack.pop(); + } while (false == global_id_stack.empty()); +} - if (parent_id != -1 && m_global_id_to_local_id.find(parent_id) == m_global_id_to_local_id.end()) +void SchemaReader::mark_unordered_object( + size_t column_reader_start, + int32_t mst_subtree_root, + std::span schema +) { + m_global_id_to_unordered_object.emplace( + mst_subtree_root, + std::make_pair(column_reader_start, schema) + ); +} + +int32_t SchemaReader::get_first_column_in_span(std::span schema) { + for (int32_t column_id : schema) { + if (false == Schema::schema_entry_is_unordered_object(column_id)) { + return column_id; + } + } + return -1; +} + +void SchemaReader::initialize_serializer() { + if (m_serializer_initialized) { + return; + } + + m_serializer_initialized = true; + + for (int32_t global_column_id : m_ordered_schema) { + generate_local_tree(global_column_id); + } + + for (auto it = m_global_id_to_unordered_object.begin(); + it != m_global_id_to_unordered_object.end(); + ++it) { - generate_local_tree(parent_id); + generate_local_tree(it->first); } - int32_t local_id = m_local_schema_tree->add_node( - parent_id == -1 ? -1 : m_global_id_to_local_id[parent_id], - node->get_type(), - node->get_key_name() - ); - m_global_id_to_local_id[global_id] = local_id; - m_local_id_to_global_id[local_id] = global_id; + // TODO: this code will have to change once we allow mixing log lines parsed by different + // parsers. + generate_json_template(0); } void SchemaReader::generate_json_template(int32_t id) { - auto node = m_local_schema_tree->get_node(id); - auto children_ids = node->get_children_ids(); + auto const& node = m_local_schema_tree.get_node(id); + auto const& children_ids = node.get_children_ids(); for (int32_t child_id : children_ids) { int32_t child_global_id = m_local_id_to_global_id[child_id]; - auto child_node = m_local_schema_tree->get_node(child_id); - std::string const& key = child_node->get_key_name(); - switch (child_node->get_type()) { - case NodeType::OBJECT: { + auto const& child_node = m_local_schema_tree.get_node(child_id); + std::string const& key = child_node.get_key_name(); + switch (child_node.get_type()) { + case NodeType::Object: { m_json_serializer.add_op(JsonSerializer::Op::BeginObject); m_json_serializer.add_special_key(key); generate_json_template(child_id); m_json_serializer.add_op(JsonSerializer::Op::EndObject); break; } - case NodeType::ARRAY: { + case NodeType::UnstructuredArray: { m_json_serializer.add_op(JsonSerializer::Op::AddArrayField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; } - case NodeType::INTEGER: { + case NodeType::Integer: { m_json_serializer.add_op(JsonSerializer::Op::AddIntField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; } - case NodeType::FLOAT: { + case NodeType::Float: { m_json_serializer.add_op(JsonSerializer::Op::AddFloatField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; } - case NodeType::BOOLEAN: { + case NodeType::Boolean: { m_json_serializer.add_op(JsonSerializer::Op::AddBoolField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; } - case NodeType::CLPSTRING: - case NodeType::VARSTRING: - case NodeType::DATESTRING: { + case NodeType::ClpString: + case NodeType::VarString: + case NodeType::DateString: { m_json_serializer.add_op(JsonSerializer::Op::AddStringField); m_reordered_columns.push_back(m_column_map[child_global_id]); break; } - case NodeType::NULLVALUE: { + case NodeType::NullValue: { m_json_serializer.add_op(JsonSerializer::Op::AddNullField); m_json_serializer.add_special_key(key); break; } - case NodeType::UNKNOWN: + case NodeType::Unknown: break; } } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index bfc507565..6ea5f57df 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_SCHEMAREADER_HPP #define CLP_S_SCHEMAREADER_HPP +#include #include #include #include @@ -21,56 +22,87 @@ class FilterClass { * Initializes the filter * @param reader * @param schema_id - * @param columns + * @param column_readers */ virtual void init( SchemaReader* reader, int32_t schema_id, - std::unordered_map& columns + std::vector const& column_readers ) = 0; /** * Filters the message * @param cur_message - * @param extracted_values * @return true if the message is accepted */ - virtual bool filter( - uint64_t cur_message, - std::map>& extracted_values - ) = 0; + virtual bool filter(uint64_t cur_message) = 0; }; class SchemaReader { public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + struct TableMetadata { uint64_t num_messages; size_t offset; + size_t uncompressed_size; }; // Constructor - explicit SchemaReader( - std::shared_ptr schema_tree, - int32_t schema_id, - uint64_t num_messages, - bool should_marshal_records - ) - : m_schema_id(schema_id), - m_num_messages(num_messages), - m_cur_message(0), - m_timestamp_column(nullptr), - m_get_timestamp([]() -> epochtime_t { return 0; }), - m_global_schema_tree(std::move(schema_tree)), - m_local_schema_tree(std::make_unique()), - m_should_marshal_records(should_marshal_records) {} + SchemaReader() {} // Destructor - ~SchemaReader() { + ~SchemaReader() { delete_columns(); } + + void delete_columns() { for (auto& i : m_columns) { delete i; } } + /** + * Resets the contents of this SchemaReader and prepares it to become a SchemaReader with a new + * schema id, schema tree, and other parameters. After this call the SchemaReader is prepared + * to accept append_column calls for the new schema. + * + * @param schema_tree + * @param schema_id + * @param ordered_schema + * @param num_messages + * @param should_marshal_records + */ + void reset( + std::shared_ptr schema_tree, + int32_t schema_id, + std::span ordered_schema, + uint64_t num_messages, + bool should_marshal_records + ) { + m_schema_id = schema_id; + m_num_messages = num_messages; + m_cur_message = 0; + m_serializer_initialized = false; + m_ordered_schema = ordered_schema; + delete_columns(); + m_column_map.clear(); + m_columns.clear(); + m_reordered_columns.clear(); + m_timestamp_column = nullptr; + m_get_timestamp = []() -> epochtime_t { return 0; }; + m_local_id_to_global_id.clear(); + m_global_id_to_local_id.clear(); + m_global_id_to_unordered_object.clear(); + m_local_schema_tree.clear(); + m_json_serializer.clear(); + m_global_schema_tree = std::move(schema_tree); + m_should_marshal_records = should_marshal_records; + } + /** * Appends a column to the schema reader * @param column_reader @@ -78,16 +110,31 @@ class SchemaReader { void append_column(BaseColumnReader* column_reader); /** - * Appends a column to the schema reader - * @param id + * Appends an unordered column to the schema reader + * @param column_reader */ - void append_column(int32_t id); + void append_unordered_column(BaseColumnReader* column_reader); + + size_t get_column_size() { return m_columns.size(); } + + /** + * Marks an unordered object for the purpose of marshalling records. + * @param column_reader_start, + * @param mst_subtree_root, + * @param schema + */ + void mark_unordered_object( + size_t column_reader_start, + int32_t mst_subtree_root, + std::span schema + ); /** * Loads the encoded messages * @param decompressor + * @param uncompressed_size */ - void load(ZstdDecompressor& decompressor); + void load(ZstdDecompressor& decompressor, size_t uncompressed_size); /** * Gets next message @@ -133,44 +180,60 @@ class SchemaReader { private: /** - * Generates a local schema tree + * Merges the current local schema tree with the section of the global schema tree corresponding + * to the path from the root of the global schema tree to the node matching the global MPT node + * id passed to this function. * @param global_id */ void generate_local_tree(int32_t global_id); /** * Generates a json template - * @param object * @param id - * @param json_pointer */ void generate_json_template(int32_t id); + /** + * @param schema + * @return the first column ID found in the given schema, or -1 if the schema contains no + * columns + */ + static inline int32_t get_first_column_in_span(std::span schema); + /** * Generates a json string from the extracted values */ void generate_json_string(); + /** + * Initializes all internal data structured required to serialize records. + */ + void initialize_serializer(); + int32_t m_schema_id; uint64_t m_num_messages; uint64_t m_cur_message; + std::span m_ordered_schema; std::unordered_map m_column_map; std::vector m_columns; std::vector m_reordered_columns; + std::unique_ptr m_table_buffer; + size_t m_table_buffer_size{0}; BaseColumnReader* m_timestamp_column; std::function m_get_timestamp; std::shared_ptr m_global_schema_tree; - std::unique_ptr m_local_schema_tree; + SchemaTree m_local_schema_tree; std::unordered_map m_global_id_to_local_id; std::unordered_map m_local_id_to_global_id; JsonSerializer m_json_serializer; bool m_should_marshal_records{true}; + bool m_serializer_initialized{false}; - std::map> m_extracted_values; + std::map>> m_global_id_to_unordered_object; }; } // namespace clp_s diff --git a/components/core/src/clp_s/SchemaTree.cpp b/components/core/src/clp_s/SchemaTree.cpp index 879023cc7..e56168a2c 100644 --- a/components/core/src/clp_s/SchemaTree.cpp +++ b/components/core/src/clp_s/SchemaTree.cpp @@ -10,17 +10,17 @@ int32_t SchemaTree::add_node(int32_t parent_node_id, NodeType type, std::string auto node_it = m_node_map.find(node_key); if (node_it != m_node_map.end()) { auto node_id = node_it->second; - m_nodes[node_id]->increase_count(); + m_nodes[node_id].increase_count(); return node_id; } - auto node = std::make_shared(parent_node_id, m_nodes.size(), key, type); - node->increase_count(); - m_nodes.push_back(node); - int32_t node_id = node->get_id(); + int32_t node_id = m_nodes.size(); + auto& node = m_nodes.emplace_back(parent_node_id, node_id, key, type, 0); + node.increase_count(); if (parent_node_id >= 0) { - auto parent_node = m_nodes[parent_node_id]; - parent_node->add_child(node_id); + auto& parent_node = m_nodes[parent_node_id]; + node.set_depth(parent_node.get_depth() + 1); + parent_node.add_child(node_id); } m_node_map[node_key] = node_id; @@ -39,12 +39,12 @@ size_t SchemaTree::store(std::string const& archives_dir, int compression_level) schema_tree_compressor.write_numeric_value(m_nodes.size()); for (auto const& node : m_nodes) { - schema_tree_compressor.write_numeric_value(node->get_parent_id()); + schema_tree_compressor.write_numeric_value(node.get_parent_id()); - std::string const& key = node->get_key_name(); + std::string const& key = node.get_key_name(); schema_tree_compressor.write_numeric_value(key.size()); schema_tree_compressor.write_string(key); - schema_tree_compressor.write_numeric_value(node->get_type()); + schema_tree_compressor.write_numeric_value(node.get_type()); } schema_tree_compressor.close(); @@ -52,4 +52,21 @@ size_t SchemaTree::store(std::string const& archives_dir, int compression_level) schema_tree_writer.close(); return compressed_size; } + +int32_t SchemaTree::find_matching_subtree_root_in_subtree( + int32_t const subtree_root_node, + int32_t node, + NodeType type +) const { + int32_t earliest_match = -1; + while (subtree_root_node != node) { + auto const& schema_node = get_node(node); + if (schema_node.get_type() == type) { + earliest_match = node; + } + node = schema_node.get_parent_id(); + } + return earliest_match; +} + } // namespace clp_s diff --git a/components/core/src/clp_s/SchemaTree.hpp b/components/core/src/clp_s/SchemaTree.hpp index b4881ed07..3b9a8d36f 100644 --- a/components/core/src/clp_s/SchemaTree.hpp +++ b/components/core/src/clp_s/SchemaTree.hpp @@ -11,29 +11,30 @@ namespace clp_s { enum class NodeType : uint8_t { - INTEGER, - FLOAT, - CLPSTRING, - VARSTRING, - BOOLEAN, - OBJECT, - ARRAY, - NULLVALUE, - DATESTRING, - UNKNOWN + Integer, + Float, + ClpString, + VarString, + Boolean, + Object, + UnstructuredArray, + NullValue, + DateString, + Unknown = std::underlying_type::type(~0ULL) }; class SchemaNode { public: // Constructor - SchemaNode() : m_parent_id(-1), m_id(-1), m_type(NodeType::INTEGER), m_count(0) {} + SchemaNode() : m_parent_id(-1), m_id(-1), m_type(NodeType::Integer), m_count(0) {} - SchemaNode(int32_t parent_id, int32_t id, std::string key_name, NodeType type) + SchemaNode(int32_t parent_id, int32_t id, std::string key_name, NodeType type, int32_t depth) : m_parent_id(parent_id), m_id(id), m_key_name(std::move(key_name)), m_type(type), - m_count(0) {} + m_count(0), + m_depth(depth) {} /** * Getters @@ -42,7 +43,7 @@ class SchemaNode { int32_t get_parent_id() const { return m_parent_id; } - std::vector& get_children_ids() { return m_children_ids; } + std::vector const& get_children_ids() const { return m_children_ids; } NodeType get_type() const { return m_type; } @@ -50,6 +51,10 @@ class SchemaNode { int32_t get_count() const { return m_count; } + int32_t get_depth() const { return m_depth; } + + void set_depth(int32_t depth) { m_depth = depth; } + /** * Increases the count of this node by 1 */ @@ -68,6 +73,7 @@ class SchemaNode { std::string m_key_name; NodeType m_type; int32_t m_count; + int32_t m_depth{0}; }; class SchemaTree { @@ -78,7 +84,7 @@ class SchemaTree { bool has_node(int32_t id) { return id < m_nodes.size() && id >= 0; } - std::shared_ptr get_node(int32_t id) { + SchemaNode const& get_node(int32_t id) const { if (id >= m_nodes.size() || id < 0) { throw std::invalid_argument("invalid access of id " + std::to_string(id)); } @@ -86,9 +92,9 @@ class SchemaTree { return m_nodes[id]; } - int32_t get_root_node_id() { return m_nodes[0]->get_id(); } + int32_t get_root_node_id() const { return m_nodes[0].get_id(); } - std::vector> get_nodes() { return m_nodes; } + std::vector const& get_nodes() const { return m_nodes; } /** * Write the contents of the SchemaTree to the schema tree file @@ -106,8 +112,22 @@ class SchemaTree { m_node_map.clear(); } + /** + * Finds an ancestor node within a subtree that matches the given type. When multiple matching + * nodes exist, returns the one closest to the root node of the subtree. + * @param subtree_root_node The root node of the subtree + * @param node The node to start searching from + * @param subtree_type The type of the ancestor node to find + * @return The ID of the ancestor node if it exists, otherwise -1 + */ + [[nodiscard]] int32_t find_matching_subtree_root_in_subtree( + int32_t const subtree_root_node, + int32_t node, + NodeType type + ) const; + private: - std::vector> m_nodes; + std::vector m_nodes; absl::flat_hash_map, int32_t> m_node_map; }; } // namespace clp_s diff --git a/components/core/src/clp_s/SchemaWriter.cpp b/components/core/src/clp_s/SchemaWriter.cpp index 0c476d1ba..b6de15f2f 100644 --- a/components/core/src/clp_s/SchemaWriter.cpp +++ b/components/core/src/clp_s/SchemaWriter.cpp @@ -17,14 +17,22 @@ size_t SchemaWriter::append_message(ParsedMessage& message) { count++; } + for (auto& i : message.get_unordered_content()) { + m_columns[count]->add_value(i, size); + total_size += size; + ++count; + } + m_num_messages++; return total_size; } -void SchemaWriter::store(ZstdCompressor& compressor) { +size_t SchemaWriter::store(ZstdCompressor& compressor) { + size_t total_size = 0; for (auto& writer : m_columns) { - writer->store(compressor); + total_size += writer->store(compressor); } + return total_size; } SchemaWriter::~SchemaWriter() { diff --git a/components/core/src/clp_s/SchemaWriter.hpp b/components/core/src/clp_s/SchemaWriter.hpp index e8f537799..41b28600b 100644 --- a/components/core/src/clp_s/SchemaWriter.hpp +++ b/components/core/src/clp_s/SchemaWriter.hpp @@ -40,8 +40,9 @@ class SchemaWriter { /** * Stores the columns to disk. * @param compressor + * @return the uncompressed in-memory size of the table */ - void store(ZstdCompressor& compressor); + [[nodiscard]] size_t store(ZstdCompressor& compressor); /** * Closes the schema writer. @@ -55,6 +56,7 @@ class SchemaWriter { uint64_t m_num_messages; std::vector m_columns; + std::vector m_unordered_columns; }; } // namespace clp_s diff --git a/components/core/src/clp_s/Utils.hpp b/components/core/src/clp_s/Utils.hpp index fefdce07c..5a65ca7f2 100644 --- a/components/core/src/clp_s/Utils.hpp +++ b/components/core/src/clp_s/Utils.hpp @@ -2,6 +2,7 @@ #define CLP_S_UTILS_HPP #include +#include #include #include @@ -247,27 +248,42 @@ inline T2 bit_cast(T1 t1) { } /** - * A span of memory + * A span of memory where the underlying memory may not be aligned correctly for type T. + * + * This class should be used whenever we need a view into some memory, and we do not know whether it + * is aligned correctly for type T. If the alignment of the underlying memory is known std::span + * should be used instead. + * + * In C++ creating a pointer to objects of type T that is not correctly aligned for type T is + * undefined behaviour, as is dereferencing such a pointer. This class avoids this undefined + * behaviour by using memcpy (which any modern compiler should be able to optimize away). + * + * For any modern x86 platform the performance difference between using std::span and + * UnalignedMemSpan should be fairly minimal. + * * @tparam T */ template -class Span { +class UnalignedMemSpan { public: - Span() = default; - Span(T* begin, size_t size) : m_begin(begin), m_size(size) {}; - - T* begin() { return m_begin; } - - T* end() { return m_begin + m_size; } + UnalignedMemSpan() = default; + UnalignedMemSpan(char* begin, size_t size) : m_begin(begin), m_size(size) {}; size_t size() { return m_size; } - T& operator[](size_t i) { return m_begin[i]; } + T operator[](size_t i) { + T tmp; + memcpy(&tmp, m_begin + i * sizeof(T), sizeof(T)); + return tmp; + } + + UnalignedMemSpan sub_span(size_t start, size_t size) { + return {m_begin + start * sizeof(T), size}; + } private: - T* m_begin; - size_t m_size{}; + char* m_begin{nullptr}; + size_t m_size{0}; }; } // namespace clp_s - #endif // CLP_S_UTILS_HPP diff --git a/components/core/src/clp_s/VariableDecoder.cpp b/components/core/src/clp_s/VariableDecoder.cpp index ff91a87bb..a4c36ce7b 100644 --- a/components/core/src/clp_s/VariableDecoder.cpp +++ b/components/core/src/clp_s/VariableDecoder.cpp @@ -6,7 +6,7 @@ namespace clp_s { bool VariableDecoder::decode_variables_into_message( LogTypeDictionaryEntry const& logtype_dict_entry, VariableDictionaryReader const& var_dict, - Span encoded_vars, + UnalignedMemSpan encoded_vars, std::string& decompressed_msg ) { size_t num_vars_in_logtype = logtype_dict_entry.get_num_vars(); diff --git a/components/core/src/clp_s/VariableDecoder.hpp b/components/core/src/clp_s/VariableDecoder.hpp index f99a08dad..82f8dc701 100644 --- a/components/core/src/clp_s/VariableDecoder.hpp +++ b/components/core/src/clp_s/VariableDecoder.hpp @@ -20,7 +20,7 @@ class VariableDecoder { static bool decode_variables_into_message( LogTypeDictionaryEntry const& logtype_dict_entry, VariableDictionaryReader const& var_dict, - Span encoded_vars, + UnalignedMemSpan encoded_vars, std::string& decompressed_msg ); diff --git a/components/core/src/clp_s/ZstdDecompressor.cpp b/components/core/src/clp_s/ZstdDecompressor.cpp index ee1632732..87d3ae8fa 100644 --- a/components/core/src/clp_s/ZstdDecompressor.cpp +++ b/components/core/src/clp_s/ZstdDecompressor.cpp @@ -148,8 +148,12 @@ void ZstdDecompressor::open(FileReader& file_reader, size_t file_read_buffer_cap m_file_reader = &file_reader; m_file_reader_initial_pos = m_file_reader->get_pos(); - m_file_read_buffer_capacity = file_read_buffer_capacity; - m_file_read_buffer = std::make_unique(m_file_read_buffer_capacity); + // Avoid reallocating the internal buffer if this instance is being re-used with an + // unchanged buffer size. + if (file_read_buffer_capacity != m_file_read_buffer_capacity) { + m_file_read_buffer_capacity = file_read_buffer_capacity; + m_file_read_buffer = std::make_unique(m_file_read_buffer_capacity); + } m_file_read_buffer_length = 0; m_compressed_stream_block = {m_file_read_buffer.get(), m_file_read_buffer_length, 0}; @@ -181,6 +185,16 @@ void ZstdDecompressor::close() { m_input_type = InputType::NotInitialized; } +void ZstdDecompressor::close_for_reuse() { + if (InputType::File != m_input_type) { + close(); + return; + } + m_file_read_buffer_length = 0; + m_file_reader = nullptr; + m_input_type = InputType::NotInitialized; +} + ErrorCode ZstdDecompressor::open(std::string const& compressed_file_path) { if (InputType::NotInitialized != m_input_type) { throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); diff --git a/components/core/src/clp_s/ZstdDecompressor.hpp b/components/core/src/clp_s/ZstdDecompressor.hpp index 7ac9d31f4..93eede0c8 100644 --- a/components/core/src/clp_s/ZstdDecompressor.hpp +++ b/components/core/src/clp_s/ZstdDecompressor.hpp @@ -44,6 +44,12 @@ class ZstdDecompressor : public Decompressor { void close() override; + /** + * Closes this ZstdDecompressor with a hint that the ZstdDecompressor will be reused. This + * makes the ZstdDecompressor hold onto its internal buffer, avoiding reallocations. + */ + void close_for_reuse(); + // Methods /*** * Initialize streaming decompressor to decompress from a compressed file specified by the given diff --git a/components/core/src/clp_s/search/ColumnDescriptor.cpp b/components/core/src/clp_s/search/ColumnDescriptor.cpp index 970270060..33d5ed6cf 100644 --- a/components/core/src/clp_s/search/ColumnDescriptor.cpp +++ b/components/core/src/clp_s/search/ColumnDescriptor.cpp @@ -26,18 +26,27 @@ ColumnDescriptor::ColumnDescriptor(std::string const& descriptor) { m_flags = cAllTypes; m_descriptors.emplace_back(descriptor); check_and_set_unresolved_descriptor_flag(); + if (is_unresolved_descriptor()) { + simplify_descriptor_wildcards(); + } } ColumnDescriptor::ColumnDescriptor(std::vector const& descriptors) { m_flags = cAllTypes; m_descriptors = std::move(tokenize_descriptor(descriptors)); check_and_set_unresolved_descriptor_flag(); + if (is_unresolved_descriptor()) { + simplify_descriptor_wildcards(); + } } ColumnDescriptor::ColumnDescriptor(DescriptorList const& descriptors) { m_flags = cAllTypes; m_descriptors = descriptors; check_and_set_unresolved_descriptor_flag(); + if (is_unresolved_descriptor()) { + simplify_descriptor_wildcards(); + } } std::shared_ptr ColumnDescriptor::create(std::string const& descriptor) { @@ -94,4 +103,21 @@ bool ColumnDescriptor::operator==(ColumnDescriptor const& rhs) const { && m_unresolved_descriptors == rhs.m_unresolved_descriptors && m_pure_wildcard == rhs.m_pure_wildcard; } + +void ColumnDescriptor::simplify_descriptor_wildcards() { + DescriptorList new_descriptor_list; + bool prev_was_wildcard = false; + for (auto& token : m_descriptors) { + if (prev_was_wildcard && token.wildcard()) { + continue; + } + prev_was_wildcard = token.wildcard(); + new_descriptor_list.push_back(std::move(token)); + } + m_descriptors = std::move(new_descriptor_list); + if (1 == m_descriptors.size()) { + m_pure_wildcard = true; + } +} + } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/ColumnDescriptor.hpp b/components/core/src/clp_s/search/ColumnDescriptor.hpp index 39ea68ffd..0dc348817 100644 --- a/components/core/src/clp_s/search/ColumnDescriptor.hpp +++ b/components/core/src/clp_s/search/ColumnDescriptor.hpp @@ -175,6 +175,11 @@ class ColumnDescriptor : public Literal { // only have a single type LiteralType get_literal_type() const { return static_cast(m_flags); } + /** + * @return a bitmask indicating all of the matching types for this column. + */ + LiteralTypeBitmask get_matching_types() const { return m_flags; } + /** * Whether the list of Descriptor's contains any wildcards * @return true if the descriptor contains any wildcards that need to be resolved @@ -224,6 +229,11 @@ class ColumnDescriptor : public Literal { * set the appropriate flags. */ void check_and_set_unresolved_descriptor_flag(); + + /** + * Scans the list of descriptors to eliminate any series of multiple wildcards in a row. + */ + void simplify_descriptor_wildcards(); }; } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/FilterExpr.cpp b/components/core/src/clp_s/search/FilterExpr.cpp index 55c62733c..565f252dc 100644 --- a/components/core/src/clp_s/search/FilterExpr.cpp +++ b/components/core/src/clp_s/search/FilterExpr.cpp @@ -20,6 +20,8 @@ std::string FilterExpr::op_type_str(FilterOperation op) { switch (op) { case FilterOperation::EXISTS: return "EXISTS"; + case FilterOperation::NEXISTS: + return "NEXISTS"; case FilterOperation::EQ: return "EQ"; case FilterOperation::NEQ: diff --git a/components/core/src/clp_s/search/Literal.hpp b/components/core/src/clp_s/search/Literal.hpp index ecea62418..8374e01e6 100644 --- a/components/core/src/clp_s/search/Literal.hpp +++ b/components/core/src/clp_s/search/Literal.hpp @@ -20,7 +20,7 @@ enum LiteralType : uint32_t { ArrayT = 1 << 5, NullT = 1 << 6, EpochDateT = 1 << 7, - TypesEnd = 1 << 9, + TypesEnd = 1 << 8, UnknownT = ((uint32_t)1) << 31 }; diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index 61edd2cd1..6863bba35 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -71,10 +71,8 @@ bool Output::filter() { m_expr_clp_query.clear(); m_expr_var_match_map.clear(); m_expr = m_match.get_query_for_schema(schema_id)->copy(); - m_wildcard_to_searched_columns.clear(); - m_wildcard_to_searched_clpstrings.clear(); - m_wildcard_to_searched_varstrings.clear(); - m_wildcard_to_searched_datestrings.clear(); + m_wildcard_to_searched_basic_columns.clear(); + m_wildcard_columns.clear(); m_schema = schema_id; populate_searched_wildcard_columns(m_expr); @@ -87,20 +85,20 @@ bool Output::filter() { add_wildcard_columns_to_searched_columns(); - auto reader = m_archive_reader->read_table( + auto& reader = m_archive_reader->read_table( schema_id, m_output_handler->should_output_timestamp(), m_should_marshal_records ); - reader->initialize_filter(this); + reader.initialize_filter(this); if (m_output_handler->should_output_timestamp()) { epochtime_t timestamp; - while (reader->get_next_message_with_timestamp(message, timestamp, this)) { + while (reader.get_next_message_with_timestamp(message, timestamp, this)) { m_output_handler->write(message, timestamp); } } else { - while (reader->get_next_message(message, this)) { + while (reader.get_next_message(message, this)) { m_output_handler->write(message); } } @@ -127,73 +125,64 @@ bool Output::filter() { void Output::init( SchemaReader* reader, int32_t schema_id, - std::unordered_map& columns + std::vector const& column_readers ) { m_reader = reader; m_schema = schema_id; - m_searched_columns.clear(); - m_other_columns.clear(); - - for (auto& column : columns) { - ClpStringColumnReader* clp_reader = dynamic_cast(column.second); - VariableStringColumnReader* var_reader - = dynamic_cast(column.second); - if (m_match.schema_searches_against_column(schema_id, column.first)) { - if (clp_reader != nullptr && clp_reader->get_type() == NodeType::CLPSTRING) { - m_clp_string_readers[column.first] = clp_reader; - m_other_columns.push_back(column.second); - } else if (var_reader != nullptr && var_reader->get_type() == NodeType::VARSTRING) { - m_var_string_readers[column.first] = var_reader; - m_other_columns.push_back(column.second); - } else if (auto date_column_reader - = dynamic_cast(column.second)) - { - m_datestring_readers[column.first] = date_column_reader; - m_other_columns.push_back(column.second); + m_clp_string_readers.clear(); + m_var_string_readers.clear(); + m_datestring_readers.clear(); + m_basic_readers.clear(); + + for (auto column_reader : column_readers) { + auto column_id = column_reader->get_id(); + if ((0 + != (m_wildcard_type_mask + & node_to_literal_type(m_schema_tree->get_node(column_id).get_type()))) + || m_match.schema_searches_against_column(schema_id, column_id)) + { + ClpStringColumnReader* clp_reader = dynamic_cast(column_reader); + VariableStringColumnReader* var_reader + = dynamic_cast(column_reader); + DateStringColumnReader* date_reader + = dynamic_cast(column_reader); + if (nullptr != clp_reader && clp_reader->get_type() == NodeType::ClpString) { + m_clp_string_readers[column_id].push_back(clp_reader); + } else if (nullptr != var_reader && var_reader->get_type() == NodeType::VarString) { + m_var_string_readers[column_id].push_back(var_reader); + } else if (nullptr != date_reader) { + // Datestring readers with a given column ID are guaranteed not to repeat + m_datestring_readers.emplace(column_id, date_reader); } else { - m_searched_columns.push_back(column.second); + m_basic_readers[column_id].push_back(column_reader); } - } else { - m_other_columns.push_back(column.second); } } } -bool Output::filter( - uint64_t cur_message, - std::map>& extracted_values -) { - m_cur_message = cur_message; - m_cached_string_columns.clear(); - for (auto* column : m_searched_columns) { - extracted_values[column->get_id()] = column->extract_value(cur_message); - } - - // filter - if (false == evaluate(m_expr.get(), m_schema, extracted_values)) { - return false; +std::string& Output::get_cached_decompressed_unstructured_array(int32_t column_id) { + auto it = m_extracted_unstructured_arrays.find(column_id); + if (m_extracted_unstructured_arrays.end() != it) { + return it->second; } - // We only need to extract all columns if we're actually marshalling records into JSON strings. - // TODO: consider getting rid of extracted_values entirely and just accessing the underlying - // columns directly once we have time to clean up search. - if (m_should_marshal_records) { - for (auto* column : m_other_columns) { - if (m_cached_string_columns.find(column->get_id()) == m_cached_string_columns.end()) { - extracted_values[column->get_id()] = column->extract_value(cur_message); - } - } - } + // Unstructured arrays with the same column id can not appear multiple times in one schema + // in the current implementation. + auto rit = m_extracted_unstructured_arrays.emplace( + column_id, + std::get(m_basic_readers[column_id][0]->extract_value(m_cur_message)) + ); + return rit.first->second; +} - return true; +bool Output::filter(uint64_t cur_message) { + m_cur_message = cur_message; + m_extracted_unstructured_arrays.clear(); + return evaluate(m_expr.get(), m_schema); } -bool Output::evaluate( - Expression* expr, - int32_t schema, - std::map>& extracted_values -) { +bool Output::evaluate(Expression* expr, int32_t schema) { if (m_expression_value == EvaluatedValue::True) { return true; } @@ -233,13 +222,9 @@ bool Output::evaluate( } case ExpressionType::Filter: if (static_cast(cur)->get_column()->is_pure_wildcard()) { - ret = evaluate_wildcard_filter( - static_cast(cur), - schema, - extracted_values - ); + ret = evaluate_wildcard_filter(static_cast(cur), schema); } else { - ret = evaluate_filter(static_cast(cur), schema, extracted_values); + ret = evaluate_filter(static_cast(cur), schema); } break; case ExpressionType::Or: @@ -271,64 +256,53 @@ bool Output::evaluate( return ret; } -bool Output::evaluate_wildcard_filter( - FilterExpr* expr, - int32_t schema, - std::map>& extracted_values -) { +bool Output::evaluate_wildcard_filter(FilterExpr* expr, int32_t schema) { auto literal = expr->get_operand(); auto* column = expr->get_column().get(); Query* q = m_expr_clp_query[expr]; std::unordered_set* matching_vars = m_expr_var_match_map[expr]; auto op = expr->get_operation(); - for (int32_t column_id : m_wildcard_to_searched_clpstrings[column]) { - if (evaluate_clp_string_filter(op, q, column_id, literal, extracted_values)) { - return true; + if (column->matches_type(LiteralType::ClpStringT)) { + for (auto entry : m_clp_string_readers) { + if (evaluate_clp_string_filter(op, q, entry.second, literal)) { + return true; + } } } - for (int32_t column_id : m_wildcard_to_searched_varstrings[column]) { - if (evaluate_var_string_filter(op, m_var_string_readers[column_id], matching_vars, literal)) - { - return true; + if (column->matches_type(LiteralType::VarStringT)) { + for (auto entry : m_var_string_readers) { + if (evaluate_var_string_filter(op, entry.second, matching_vars, literal)) { + return true; + } } } - for (int32_t column_id : m_wildcard_to_searched_datestrings[column]) { - if (evaluate_epoch_date_filter(op, m_datestring_readers[column_id], literal)) { - return true; + if (column->matches_type(LiteralType::EpochDateT)) { + for (auto entry : m_datestring_readers) { + if (evaluate_epoch_date_filter(op, entry.second, literal)) { + return true; + } } } m_maybe_number = expr->get_column()->matches_type(LiteralType::FloatT); - for (int32_t column_id : m_wildcard_to_searched_columns[column]) { + for (int32_t column_id : m_wildcard_to_searched_basic_columns[column]) { bool ret = false; - switch (node_to_literal_type(m_schema_tree->get_node(column_id)->get_type())) { + switch (node_to_literal_type(m_schema_tree->get_node(column_id).get_type())) { case LiteralType::IntegerT: - ret = evaluate_int_filter( - op, - std::get(extracted_values[column_id]), - literal - ); + ret = evaluate_int_filter(op, column_id, literal); break; case LiteralType::FloatT: - ret = evaluate_float_filter( - op, - std::get(extracted_values[column_id]), - literal - ); + ret = evaluate_float_filter(op, column_id, literal); break; case LiteralType::BooleanT: - ret = evaluate_bool_filter( - op, - std::get(extracted_values[column_id]), - literal - ); + ret = evaluate_bool_filter(op, column_id, literal); break; case LiteralType::ArrayT: ret = evaluate_wildcard_array_filter( op, - std::get(extracted_values[column_id]), + get_cached_decompressed_unstructured_array(column_id), literal ); break; @@ -344,61 +318,40 @@ bool Output::evaluate_wildcard_filter( return false; } -bool Output::evaluate_filter( - FilterExpr* expr, - int32_t schema, - std::map>& extracted_values -) { +bool Output::evaluate_filter(FilterExpr* expr, int32_t schema) { auto column = expr->get_column().get(); int32_t column_id = column->get_column_id(); auto literal = expr->get_operand(); Query* q = nullptr; - ClpStringColumnReader* clp_reader = nullptr; - VariableStringColumnReader* var_reader = nullptr; std::unordered_set* matching_vars = nullptr; switch (column->get_literal_type()) { case LiteralType::IntegerT: - return evaluate_int_filter( - expr->get_operation(), - std::get(extracted_values[column_id]), - literal - ); + return evaluate_int_filter(expr->get_operation(), column_id, literal); case LiteralType::FloatT: - return evaluate_float_filter( - expr->get_operation(), - std::get(extracted_values[column_id]), - literal - ); + return evaluate_float_filter(expr->get_operation(), column_id, literal); case LiteralType::ClpStringT: q = m_expr_clp_query[expr]; - clp_reader = m_clp_string_readers[column_id]; return evaluate_clp_string_filter( expr->get_operation(), q, - column_id, - literal, - extracted_values + m_clp_string_readers[column_id], + literal ); case LiteralType::VarStringT: - var_reader = m_var_string_readers[column_id]; matching_vars = m_expr_var_match_map.at(expr); return evaluate_var_string_filter( expr->get_operation(), - var_reader, + m_var_string_readers[column_id], matching_vars, literal ); case LiteralType::BooleanT: - return evaluate_bool_filter( - expr->get_operation(), - std::get(extracted_values[column_id]), - literal - ); + return evaluate_bool_filter(expr->get_operation(), column_id, literal); case LiteralType::ArrayT: return evaluate_array_filter( expr->get_operation(), column->get_unresolved_tokens(), - std::get(extracted_values[column_id]), + get_cached_decompressed_unstructured_array(column_id), literal ); case LiteralType::EpochDateT: @@ -417,7 +370,7 @@ bool Output::evaluate_filter( bool Output::evaluate_int_filter( FilterOperation op, - int64_t value, + int32_t column_id, std::shared_ptr const& operand ) { if (FilterOperation::EXISTS == op || FilterOperation::NEXISTS == op) { @@ -429,19 +382,29 @@ bool Output::evaluate_int_filter( return false; } + for (BaseColumnReader* reader : m_basic_readers[column_id]) { + int64_t value = std::get(reader->extract_value(m_cur_message)); + if (evaluate_int_filter_core(op, value, op_value)) { + return true; + } + } + return false; +} + +bool Output::evaluate_int_filter_core(FilterOperation op, int64_t value, int64_t operand) { switch (op) { case FilterOperation::EQ: - return value == op_value; + return value == operand; case FilterOperation::NEQ: - return value != op_value; + return value != operand; case FilterOperation::LT: - return value < op_value; + return value < operand; case FilterOperation::GT: - return value > op_value; + return value > operand; case FilterOperation::LTE: - return value <= op_value; + return value <= operand; case FilterOperation::GTE: - return value >= op_value; + return value >= operand; default: return false; } @@ -449,7 +412,7 @@ bool Output::evaluate_int_filter( bool Output::evaluate_float_filter( FilterOperation op, - double value, + int32_t column_id, std::shared_ptr const& operand ) { if (FilterOperation::EXISTS == op || FilterOperation::NEXISTS == op) { @@ -461,19 +424,29 @@ bool Output::evaluate_float_filter( return false; } + for (BaseColumnReader* reader : m_basic_readers[column_id]) { + double value = std::get(reader->extract_value(m_cur_message)); + if (evaluate_float_filter_core(op, value, op_value)) { + return true; + } + } + return false; +} + +bool Output::evaluate_float_filter_core(FilterOperation op, double value, double operand) { switch (op) { case FilterOperation::EQ: - return value == op_value; + return value == operand; case FilterOperation::NEQ: - return value != op_value; + return value != operand; case FilterOperation::LT: - return value < op_value; + return value < operand; case FilterOperation::GT: - return value > op_value; + return value > operand; case FilterOperation::LTE: - return value <= op_value; + return value <= operand; case FilterOperation::GTE: - return value >= op_value; + return value >= operand; default: return false; } @@ -482,9 +455,8 @@ bool Output::evaluate_float_filter( bool Output::evaluate_clp_string_filter( FilterOperation op, Query* q, - int32_t column_id, - std::shared_ptr const& operand, - std::map>& extracted_values + std::vector const& readers, + std::shared_ptr const& operand ) { if (FilterOperation::EXISTS == op || FilterOperation::NEXISTS == op) { return true; @@ -494,45 +466,41 @@ bool Output::evaluate_clp_string_filter( return false; } - auto* reader = m_clp_string_readers[column_id]; - int64_t id = reader->get_encoded_id(m_cur_message); - bool matched = false; - if (q->search_string_matches_all()) { return op == FilterOperation::EQ; } - auto vars = reader->get_encoded_vars(m_cur_message); - for (auto const& subquery : q->get_sub_queries()) { - if (subquery.matches_logtype(id) && subquery.matches_vars(vars)) { - matched = true; - - if (subquery.wildcard_match_required()) { - std::string decompressed_message - = std::get(reader->extract_value(m_cur_message)); - matched = StringUtils::wildcard_match_unsafe( - decompressed_message, - q->get_search_string(), - !q->get_ignore_case() - ); - matched = (op == FilterOperation::EQ) == matched; - if (matched) { - extracted_values[column_id] = std::move(decompressed_message); - m_cached_string_columns.insert(column_id); + bool matched = false; + for (ClpStringColumnReader* reader : readers) { + int64_t id = reader->get_encoded_id(m_cur_message); + auto vars = reader->get_encoded_vars(m_cur_message); + for (auto const& subquery : q->get_sub_queries()) { + if (subquery.matches_logtype(id) && subquery.matches_vars(vars)) { + if (subquery.wildcard_match_required()) { + std::string decompressed_message + = std::get(reader->extract_value(m_cur_message)); + matched = StringUtils::wildcard_match_unsafe( + decompressed_message, + q->get_search_string(), + !q->get_ignore_case() + ); + } else { + matched = true; } - return matched; + break; } + } - break; + if ((op == FilterOperation::EQ) == matched) { + return true; } } - - return (op == FilterOperation::EQ) == matched; + return false; } bool Output::evaluate_var_string_filter( FilterOperation op, - VariableStringColumnReader* reader, + std::vector const& readers, std::unordered_set* matching_vars, std::shared_ptr const& operand ) const { @@ -540,16 +508,19 @@ bool Output::evaluate_var_string_filter( return true; } - int64_t id = reader->get_variable_id(m_cur_message); - bool matched = matching_vars->count(id); - switch (op) { - case FilterOperation::EQ: - return matched; - case FilterOperation::NEQ: - return !matched; - default: - return false; + if (FilterOperation::EQ != op && FilterOperation::NEQ != op) { + return false; + } + + for (VariableStringColumnReader* reader : readers) { + int64_t id = reader->get_variable_id(m_cur_message); + bool matched = matching_vars->count(id) > 0; + + if ((FilterOperation::EQ == op) == matched) { + return true; + } } + return false; } bool Output::evaluate_array_filter( @@ -861,7 +832,7 @@ bool Output::evaluate_wildcard_array_filter( bool Output::evaluate_bool_filter( FilterOperation op, - bool value, + int32_t column_id, std::shared_ptr const& operand ) { if (FilterOperation::EXISTS == op || FilterOperation::NEXISTS == op) { @@ -873,14 +844,25 @@ bool Output::evaluate_bool_filter( return false; } - switch (op) { - case FilterOperation::EQ: - return value == op_value; - case FilterOperation::NEQ: - return value != op_value; - default: - return false; + bool rvalue = false; + for (BaseColumnReader* reader : m_basic_readers[column_id]) { + bool value = std::get(reader->extract_value(m_cur_message)); + switch (op) { + case FilterOperation::EQ: + rvalue = value == op_value; + break; + case FilterOperation::NEQ: + rvalue = value != op_value; + break; + default: + rvalue = false; + break; + } + if (rvalue) { + return true; + } } + return false; } void Output::populate_string_queries(std::shared_ptr const& expr) { @@ -974,47 +956,31 @@ void Output::populate_searched_wildcard_columns(std::shared_ptr cons if (false == col->is_pure_wildcard()) { return; } + m_wildcard_columns.push_back(col); + LiteralTypeBitmask matching_types{0}; for (int32_t node : (*m_schemas)[m_schema]) { - auto tree_node_type = m_schema_tree->get_node(node)->get_type(); + if (Schema::schema_entry_is_unordered_object(node)) { + continue; + } + auto tree_node_type = m_schema_tree->get_node(node).get_type(); if (col->matches_type(node_to_literal_type(tree_node_type))) { - if (tree_node_type == NodeType::CLPSTRING) { - m_wildcard_to_searched_clpstrings[col].push_back(node); - } else if (tree_node_type == NodeType::VARSTRING) { - m_wildcard_to_searched_varstrings[col].push_back(node); - } else if (tree_node_type == NodeType::DATESTRING) { - m_wildcard_to_searched_datestrings[col].push_back(node); - } else { - // Arrays and basic types - m_wildcard_to_searched_columns[col].push_back(node); + auto literal_type = node_to_literal_type(tree_node_type); + matching_types |= literal_type; + if (NodeType::ClpString != tree_node_type && NodeType::VarString != tree_node_type + && NodeType::DateString != tree_node_type) + { + m_wildcard_to_searched_basic_columns[col].insert(node); } } } + col->set_matching_types(matching_types); } } void Output::add_wildcard_columns_to_searched_columns() { - for (auto& e : m_wildcard_to_searched_clpstrings) { - for (int32_t node : e.second) { - m_match.add_searched_column_to_schema(m_schema, node); - } - } - - for (auto& e : m_wildcard_to_searched_varstrings) { - for (int32_t node : e.second) { - m_match.add_searched_column_to_schema(m_schema, node); - } - } - - for (auto& e : m_wildcard_to_searched_datestrings) { - for (int32_t node : e.second) { - m_match.add_searched_column_to_schema(m_schema, node); - } - } - - for (auto& e : m_wildcard_to_searched_columns) { - for (int32_t node : e.second) { - m_match.add_searched_column_to_schema(m_schema, node); - } + m_wildcard_type_mask = 0; + for (ColumnDescriptor* wildcard : m_wildcard_columns) { + m_wildcard_type_mask |= wildcard->get_matching_types(); } } @@ -1102,8 +1068,10 @@ Output::constant_propagate(std::shared_ptr const& expr, int32_t sche bool matches_var_string = false; bool has_clp_string = false; bool matches_clp_string = false; - bool has_other = !m_wildcard_to_searched_columns[wildcard].empty() - || !m_wildcard_to_searched_datestrings[wildcard].empty(); + constexpr LiteralTypeBitmask other_types = LiteralType::ArrayT | cIntegralTypes + | LiteralType::NullT | LiteralType::BooleanT + | LiteralType::EpochDateT; + bool has_other = wildcard->matches_any(other_types); std::string filter_string; bool valid = filter->get_operand()->as_var_string(filter_string, filter->get_operation()) @@ -1117,23 +1085,23 @@ Output::constant_propagate(std::shared_ptr const& expr, int32_t sche } if (filter->get_column()->matches_type(LiteralType::ClpStringT)) { m_expr_clp_query[expr.get()] = &m_string_query_map.at(filter_string); - has_clp_string = !m_wildcard_to_searched_clpstrings[wildcard].empty(); + has_clp_string = wildcard->matches_type(LiteralType::ClpStringT); matches_clp_string = !m_expr_clp_query.at(expr.get())->get_sub_queries().empty() || m_expr_clp_query.at(expr.get())->search_string_matches_all(); } if (filter->get_column()->matches_type(LiteralType::VarStringT)) { m_expr_var_match_map[expr.get()] = &m_string_var_match_map.at(filter_string); - has_var_string = !m_wildcard_to_searched_varstrings[wildcard].empty(); + has_var_string = wildcard->matches_type(LiteralType::VarStringT); matches_var_string = !m_expr_var_match_map.at(expr.get())->empty(); } if (filter->get_operation() == FilterOperation::EQ) { if (false == matches_clp_string) { - m_wildcard_to_searched_clpstrings[wildcard].clear(); + wildcard->remove_matching_type(LiteralType::ClpStringT); } if (false == matches_var_string) { - m_wildcard_to_searched_varstrings[wildcard].clear(); + wildcard->remove_matching_type(LiteralType::VarStringT); } if (has_other) { @@ -1216,6 +1184,15 @@ bool Output::evaluate_epoch_date_filter( DateStringColumnReader* reader, std::shared_ptr& operand ) { - return evaluate_int_filter(op, reader->get_encoded_time(m_cur_message), operand); + if (FilterOperation::EXISTS == op || FilterOperation::NEXISTS == op) { + return true; + } + + int64_t op_value; + if (false == operand->as_int(op_value, op)) { + return false; + } + + return evaluate_int_filter_core(op, reader->get_encoded_time(m_cur_message), op_value); } } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/Output.hpp b/components/core/src/clp_s/search/Output.hpp index 208d00f02..70e6041e2 100644 --- a/components/core/src/clp_s/search/Output.hpp +++ b/components/core/src/clp_s/search/Output.hpp @@ -63,10 +63,6 @@ class Output : public FilterClass { bool m_should_marshal_records{true}; // variables for the current schema being filtered - std::vector m_searched_columns; - std::vector m_other_columns; - std::set m_cached_string_columns; - int32_t m_schema; SchemaReader* m_reader; @@ -82,16 +78,17 @@ class Output : public FilterClass { std::map> m_string_var_match_map; std::unordered_map m_expr_clp_query; std::unordered_map*> m_expr_var_match_map; - std::unordered_map m_clp_string_readers; - std::unordered_map m_var_string_readers; + std::unordered_map> m_clp_string_readers; + std::unordered_map> m_var_string_readers; std::unordered_map m_datestring_readers; + std::unordered_map> m_basic_readers; + std::unordered_map m_extracted_unstructured_arrays; uint64_t m_cur_message; EvaluatedValue m_expression_value; - std::map> m_wildcard_to_searched_clpstrings; - std::map> m_wildcard_to_searched_varstrings; - std::map> m_wildcard_to_searched_datestrings; - std::map> m_wildcard_to_searched_columns; + std::vector m_wildcard_columns; + std::map> m_wildcard_to_searched_basic_columns; + LiteralTypeBitmask m_wildcard_type_mask{0}; std::stack< std::pair, @@ -103,55 +100,53 @@ class Output : public FilterClass { bool m_maybe_string, m_maybe_number; /** - * Initializes the variables. It is init is called once for each schema after which filter - * is called once for every message in the schema + * Initializes the variables. Init is called once for each schema after which filter is called + * once for every message in the schema * @param reader * @param schema_id - * @param columns + * @param column_readers */ void init( SchemaReader* reader, int32_t schema_id, - std::unordered_map& columns + std::vector const& column_readers ) override; /** * Evaluates an expression * @param expr * @param schema - * @param extracted_values * @return true if the expression evaluates to true, false otherwise */ - bool evaluate( - Expression* expr, - int32_t schema, - std::map>& extracted_values - ); + bool evaluate(Expression* expr, int32_t schema); /** * Evaluates a filter expression * @param expr * @param schema - * @param extracted_values * @return true if the expression evaluates to true, false otherwise */ - bool evaluate_filter( - FilterExpr* expr, - int32_t schema, - std::map>& extracted_values - ); + bool evaluate_filter(FilterExpr* expr, int32_t schema); /** * Evaluates a wildcard filter expression * @param expr * @param schema - * @param extracted_values * @return true if the expression evaluates to true, false otherwise */ - bool evaluate_wildcard_filter( - FilterExpr* expr, - int32_t schema, - std::map>& extracted_values + bool evaluate_wildcard_filter(FilterExpr* expr, int32_t schema); + + /** + * Evaluates a int filter expression + * @param op + * @param column_id + * @param operand + * @return true if the expression evaluates to true, false otherwise + */ + bool evaluate_int_filter( + FilterOperation op, + int32_t column_id, + std::shared_ptr const& operand ); /** @@ -161,37 +156,43 @@ class Output : public FilterClass { * @param operand * @return true if the expression evaluates to true, false otherwise */ - static bool - evaluate_int_filter(FilterOperation op, int64_t value, std::shared_ptr const& operand); + static bool evaluate_int_filter_core(FilterOperation op, int64_t value, int64_t operand); /** * Evaluates a float filter expression * @param op - * @param value + * @param column_id * @param operand * @return true if the expression evaluates to true, false otherwise */ - static bool evaluate_float_filter( + bool evaluate_float_filter( FilterOperation op, - double value, + int32_t column_id, std::shared_ptr const& operand ); + /** + * Evaluates the core of a float filter expression + * @param op + * @param value + * @param operand + * @return true if the expression evaluates to true, false otherwise + */ + static bool evaluate_float_filter_core(FilterOperation op, double value, double operand); + /** * Evaluates a clp string filter expression * @param op * @param q - * @param column_id + * @param readers * @param operand - * @param extracted_values * @return true if the expression evaluates to true, false otherwise */ bool evaluate_clp_string_filter( FilterOperation op, Query* q, - int32_t column_id, - std::shared_ptr const& operand, - std::map>& extracted_values + std::vector const& readers, + std::shared_ptr const& operand ); /** @@ -204,7 +205,7 @@ class Output : public FilterClass { */ bool evaluate_var_string_filter( FilterOperation op, - VariableStringColumnReader* reader, + std::vector const& readers, std::unordered_set* matching_vars, std::shared_ptr const& operand ) const; @@ -330,12 +331,15 @@ class Output : public FilterClass { /** * Evaluates a bool filter expression * @param op - * @param value + * @param column_id * @param operand * @return true if the expression evaluates to true, false otherwise */ - static bool - evaluate_bool_filter(FilterOperation op, bool value, std::shared_ptr const& operand); + bool evaluate_bool_filter( + FilterOperation op, + int32_t column_id, + std::shared_ptr const& operand + ); /** * Populates the string queries @@ -363,11 +367,20 @@ class Output : public FilterClass { */ void add_wildcard_columns_to_searched_columns(); + /** + * Gets the cached decompressed structured array for the current message stored in the column + * column_id. Decompressing array fields can be expensive, so this interface allows us to + * decompress lazily, and decompress the field only once. + * + * Note: the string is returned by reference to allow our array search code to adjust the string + * so that we have enough padding for simdjson. + * @param column_id + * @return the string representing the unstructured array stored in the column column_id + */ + std::string& get_cached_decompressed_unstructured_array(int32_t column_id); + // Methods inherited from FilterClass - bool filter( - uint64_t cur_message, - std::map>& extracted_values - ) override; + bool filter(uint64_t cur_message) override; }; } // namespace clp_s::search diff --git a/components/core/src/clp_s/search/SchemaMatch.cpp b/components/core/src/clp_s/search/SchemaMatch.cpp index 83ef44b6a..82c9af866 100644 --- a/components/core/src/clp_s/search/SchemaMatch.cpp +++ b/components/core/src/clp_s/search/SchemaMatch.cpp @@ -1,6 +1,8 @@ #include "SchemaMatch.hpp" #include +#include +#include #include #include "AndExpr.hpp" @@ -71,13 +73,13 @@ std::shared_ptr SchemaMatch::populate_column_mapping(std::shared_ptr // TODO: will have to decide how we wan't to handle multi-column expressions // with unresolved descriptors for (int32_t node_id : m_unresolved_descriptor_to_descriptor[column]) { - auto node = m_tree->get_node(node_id); + auto const* node = &m_tree->get_node(node_id); auto literal_type = node_to_literal_type(node->get_type()); DescriptorList descriptors; while (node->get_id() != m_tree->get_root_node_id()) { // may have to explicitly mark non-regex descriptors.emplace_back(node->get_key_name()); - node = m_tree->get_node(node->get_parent_id()); + node = &m_tree->get_node(node->get_parent_id()); } std::reverse(descriptors.begin(), descriptors.end()); auto resolved_column = ColumnDescriptor::create(descriptors); @@ -95,8 +97,8 @@ std::shared_ptr SchemaMatch::populate_column_mapping(std::shared_ptr bool SchemaMatch::populate_column_mapping(ColumnDescriptor* column) { bool matched = false; if (column->is_pure_wildcard()) { - for (auto& node : m_tree->get_nodes()) { - if (column->matches_type(node_to_literal_type(node->get_type()))) { + for (auto const& node : m_tree->get_nodes()) { + if (column->matches_type(node_to_literal_type(node.get_type()))) { // column_to_descriptor_[node->get_id()].insert(column); // At least some node matches; break // Don't use column_to_descriptor_ for pure wildcard columns anyway, so @@ -109,92 +111,140 @@ bool SchemaMatch::populate_column_mapping(ColumnDescriptor* column) { return matched; } - auto root = m_tree->get_node(m_tree->get_root_node_id()); - for (int32_t child_node_id : root->get_children_ids()) { - matched |= populate_column_mapping(column, column->descriptor_begin(), child_node_id); + // TODO: once we start supporting multi-rooted MPTs this (and anything that uses + // get_root_node_id, or assumes root node id is 0) will have to change + auto const& root = m_tree->get_node(m_tree->get_root_node_id()); + for (int32_t child_node_id : root.get_children_ids()) { + matched |= populate_column_mapping(column, child_node_id); } return matched; } -bool SchemaMatch::populate_column_mapping( - ColumnDescriptor* column, - DescriptorList::iterator it, - int32_t node_id, - bool wildcard_special_flag -) { - if (it == column->descriptor_end()) { - return false; +bool SchemaMatch::populate_column_mapping(ColumnDescriptor* column, int32_t node_id) { + /** + * This function is the core of Column Resolution. The general idea is to walk down different + * branches of the mst while advancing an iterator over the column descriptor in step. + * + * There are a few notable edge cases we handle here namely + * 1) wildcard tokens must be allowed to match any number of mst nodes including zero + * 2) mst node entries with no name are automatically accepted, do not advance the token + * iterator, and can be accepted in recursive descent even if the token iterator is at the + * end + */ + using state = std::tuple; + std::priority_queue, std::greater> work_list; + std::set> visited_states; + auto it_start = column->descriptor_begin(); + work_list.emplace(std::make_tuple(0, it_start, node_id)); + // Allow matching a wildcard zero times + if (column->descriptor_end() != it_start && it_start->wildcard()) { + work_list.emplace(std::make_tuple(0, ++it_start, node_id)); } - + int32_t prev_level = 0; bool matched = false; - bool accepted = false, wildcard_accepted = false; - auto cur_node = m_tree->get_node(node_id); - DescriptorToken const& token = *it; - auto next = it; - next++; - - // accept current token - if (token.wildcard()) { - accepted = true; - wildcard_accepted = true; - } else if (cur_node->get_key_name() == token.get_token()) { - accepted = true; - } + while (false == work_list.empty()) { + auto& cur = work_list.top(); + auto [cur_depth, cur_it, cur_node_id] = cur; + work_list.pop(); + if (prev_level != cur_depth) { + prev_level = cur_depth; + visited_states.clear(); + } - if (accepted) { - // For array search, users need to specify the full path - if (cur_node->get_type() == NodeType::ARRAY && !column->is_unresolved_descriptor()) { - matched = true; - column->add_unresolved_tokens(next); - m_column_to_descriptor[node_id].insert(column); - } else if ((next == column->descriptor_end() - && column->matches_type(node_to_literal_type(cur_node->get_type())))) + // Make sure we haven't visited this state yet via different routes of resolving wildcards + auto cur_state = std::make_pair(cur_it, cur_node_id); + if (visited_states.count(cur_state) > 0) { + continue; + } + visited_states.emplace(cur_state); + + // Check if the current node is accepted + auto const& cur_node = m_tree->get_node(cur_node_id); + bool is_key_name_empty = cur_node.get_key_name().empty(); + bool at_descriptor_list_end = cur_it == column->descriptor_end(); + auto next_it = cur_it; + if (false == at_descriptor_list_end) { + ++next_it; + } + bool next_at_descriptor_list_end = next_it == column->descriptor_end(); + bool wildcard_descriptor = false == at_descriptor_list_end && cur_it->wildcard(); + bool accepted = false; + + if (wildcard_descriptor) { + accepted = true; + } else if (is_key_name_empty) { + accepted = true; + } else if ((false == at_descriptor_list_end + && cur_node.get_key_name() == cur_it->get_token())) { - // potentially match current node if accepted its token + accepted = true; + } + + // Check if the current node is matched + if (false == accepted) { + continue; + } + + // Currently we only allow fully resolved descriptors for precise array search + if (NodeType::UnstructuredArray == cur_node.get_type() + && false == column->is_unresolved_descriptor()) + { + /** + * TODO: This doesn't work in general, but it had the same limitation in the previous + * implementation, so I will leave it broken for now. + * + * E.g. breaks for a query like `a.b.c:d` on the collection of objects + * {"a": [{"b": {"c": "d"}}]} + * {"a": {"b": [{"c": "d"}]}} + */ + column->add_unresolved_tokens(next_it); + m_column_to_descriptor[cur_node_id].insert(column); matched = true; + continue; + } else if ((next_at_descriptor_list_end + && column->matches_type(node_to_literal_type(cur_node.get_type())))) + { if (false == column->is_unresolved_descriptor()) { - m_column_to_descriptor[node_id].insert(column); + m_column_to_descriptor[cur_node_id].insert(column); } else { - m_unresolved_descriptor_to_descriptor[column].insert(node_id); + m_unresolved_descriptor_to_descriptor[column].insert(cur_node_id); } + matched = true; + continue; } - } else { - return matched; - } - // handle wildcard match 0 case - bool wildcard_special_continue = (wildcard_special_flag || !wildcard_accepted) - && next != column->descriptor_end() && next->wildcard(); - if (wildcard_special_continue) { - // have to allow matching current node again to honour - // 0 or more matches. Set the wildcard special flag to avoid matching - // the following case erroneously - // tok.*.tok - matched |= populate_column_mapping(column, next, node_id, true); - } else if (false == wildcard_special_flag && wildcard_accepted) { - matched |= populate_column_mapping(column, next, node_id); - } + // Allow matching a wildcard zero times + if (false == next_at_descriptor_list_end && next_it->wildcard()) { + work_list.emplace(std::make_tuple(cur_depth, next_it, cur_node_id)); + } - // match against children - for (int32_t child_node_id : cur_node->get_children_ids()) { - if (wildcard_accepted && !wildcard_special_continue) { - matched |= populate_column_mapping(column, next, child_node_id); - matched |= populate_column_mapping(column, it, child_node_id); - } else if (false == wildcard_accepted) { - matched |= populate_column_mapping(column, next, child_node_id); + // Push nodes to the work list + for (int32_t child_node_id : cur_node.get_children_ids()) { + if (is_key_name_empty) { + // Don't advance the iterator when accepting an empty key + work_list.emplace(std::make_tuple(cur_depth + 1, cur_it, child_node_id)); + } else { + work_list.emplace(std::make_tuple(cur_depth + 1, next_it, child_node_id)); + if (wildcard_descriptor) { + // Allow matching a wildcard token multiple times + work_list.emplace(std::make_tuple(cur_depth + 1, cur_it, child_node_id)); + } + } } } - return matched; } void SchemaMatch::populate_schema_mapping() { - // TODO: consider refactoring this now that schemas are std::set s + // TODO: consider refactoring this to take advantage of the ordered region of the schema for (auto& it : *m_schemas) { int32_t schema_id = it.first; for (int32_t column_id : it.second) { - if (m_tree->get_node(column_id)->get_type() == NodeType::ARRAY) { + if (Schema::schema_entry_is_unordered_object(column_id)) { + continue; + } + if (NodeType::UnstructuredArray == m_tree->get_node(column_id).get_type()) { m_array_schema_ids.insert(schema_id); } if (false == m_column_to_descriptor.count(column_id)) { @@ -232,7 +282,7 @@ std::shared_ptr SchemaMatch::intersect_schemas(std::shared_ptrget_node(m_descriptor_to_schema[column][schema])->get_type() + m_tree->get_node(m_descriptor_to_schema[column][schema]).get_type() ); } } @@ -353,7 +403,10 @@ void SchemaMatch::split_expression_by_schema( && 0 == m_array_search_schema_ids.count(schema_id))) { for (auto column_id : (*m_schemas)[schema_id]) { - if (m_tree->get_node(column_id)->get_type() == NodeType::ARRAY) { + if (Schema::schema_entry_is_unordered_object(column_id)) { + continue; + } + if (NodeType::UnstructuredArray == m_tree->get_node(column_id).get_type()) { m_array_search_schema_ids.insert(schema_id); break; } @@ -442,7 +495,7 @@ bool SchemaMatch::has_array_search(int32_t schema_id) { LiteralType SchemaMatch::get_literal_type_for_column(ColumnDescriptor* column, int32_t schema) { return node_to_literal_type( - m_tree->get_node(get_column_id_for_descriptor(column, schema))->get_type() + m_tree->get_node(get_column_id_for_descriptor(column, schema)).get_type() ); } diff --git a/components/core/src/clp_s/search/SchemaMatch.hpp b/components/core/src/clp_s/search/SchemaMatch.hpp index abee8628b..65c15499f 100644 --- a/components/core/src/clp_s/search/SchemaMatch.hpp +++ b/components/core/src/clp_s/search/SchemaMatch.hpp @@ -88,17 +88,10 @@ class SchemaMatch : public Transformation { /** * Populates the column mapping for a given column * @param column - * @param it * @param node_id - * @param wildcard_special_flag * @return true if matching is successful, false otherwise */ - bool populate_column_mapping( - ColumnDescriptor* column, - DescriptorList::iterator it, - int32_t node_id, - bool wildcard_special_flag = false - ); + bool populate_column_mapping(ColumnDescriptor* column, int32_t node_id); /** * Populates the column mapping for a given column diff --git a/components/core/src/clp_s/search/SearchUtils.cpp b/components/core/src/clp_s/search/SearchUtils.cpp index 7121ddd04..3f7c522cb 100644 --- a/components/core/src/clp_s/search/SearchUtils.cpp +++ b/components/core/src/clp_s/search/SearchUtils.cpp @@ -18,23 +18,23 @@ void splice_into( // TODO: make sure to handle Object types correctly LiteralType node_to_literal_type(NodeType type) { switch (type) { - case NodeType::INTEGER: + case NodeType::Integer: return LiteralType::IntegerT; - case NodeType::FLOAT: + case NodeType::Float: return LiteralType::FloatT; - case NodeType::CLPSTRING: + case NodeType::ClpString: return LiteralType::ClpStringT; - case NodeType::VARSTRING: + case NodeType::VarString: return LiteralType::VarStringT; - case NodeType::BOOLEAN: + case NodeType::Boolean: return LiteralType::BooleanT; - case NodeType::ARRAY: + case NodeType::UnstructuredArray: return LiteralType::ArrayT; - case NodeType::NULLVALUE: + case NodeType::NullValue: return LiteralType::NullT; - case NodeType::DATESTRING: + case NodeType::DateString: return LiteralType::EpochDateT; - case NodeType::UNKNOWN: + case NodeType::Unknown: default: return LiteralType::UnknownT; } diff --git a/components/core/src/clp_s/search/clp_search/Query.cpp b/components/core/src/clp_s/search/clp_search/Query.cpp index 507d7a0da..abeb86f28 100644 --- a/components/core/src/clp_s/search/clp_search/Query.cpp +++ b/components/core/src/clp_s/search/clp_search/Query.cpp @@ -110,7 +110,7 @@ bool SubQuery::matches_logtype(logtype_dictionary_id_t const logtype) const { return m_possible_logtype_ids.count(logtype) > 0; } -bool SubQuery::matches_vars(Span vars) const { +bool SubQuery::matches_vars(UnalignedMemSpan vars) const { if (vars.size() < m_vars.size()) { // Not enough variables to satisfy query return false; diff --git a/components/core/src/clp_s/search/clp_search/Query.hpp b/components/core/src/clp_s/search/clp_search/Query.hpp index daba27dcc..dd6bf812c 100644 --- a/components/core/src/clp_s/search/clp_search/Query.hpp +++ b/components/core/src/clp_s/search/clp_search/Query.hpp @@ -137,7 +137,7 @@ class SubQuery { * @param vars * @return true if matched, false otherwise */ - bool matches_vars(Span vars) const; + bool matches_vars(UnalignedMemSpan vars) const; private: // Variables diff --git a/components/core/src/clp_s/search/kql/CMakeLists.txt b/components/core/src/clp_s/search/kql/CMakeLists.txt index 385bd6571..ee36ee124 100644 --- a/components/core/src/clp_s/search/kql/CMakeLists.txt +++ b/components/core/src/clp_s/search/kql/CMakeLists.txt @@ -23,6 +23,6 @@ add_library( kql.cpp kql.hpp ) -target_compile_features(kql PRIVATE cxx_std_17) +target_compile_features(kql PRIVATE cxx_std_20) target_include_directories(kql PRIVATE ${ANTLR_KqlParser_OUTPUT_DIR}) target_link_libraries(kql PRIVATE antlr4_static Boost::filesystem) diff --git a/components/core/src/glt/glt/CMakeLists.txt b/components/core/src/glt/glt/CMakeLists.txt index a29e7c1c0..0c7a6af4a 100644 --- a/components/core/src/glt/glt/CMakeLists.txt +++ b/components/core/src/glt/glt/CMakeLists.txt @@ -173,7 +173,7 @@ set( ) add_executable(glt ${GLT_SOURCES}) -target_compile_features(glt PRIVATE cxx_std_17) +target_compile_features(glt PRIVATE cxx_std_20) target_include_directories(glt PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(glt PRIVATE diff --git a/components/core/src/glt/string_utils/CMakeLists.txt b/components/core/src/glt/string_utils/CMakeLists.txt index bbfde63ea..3759938e5 100644 --- a/components/core/src/glt/string_utils/CMakeLists.txt +++ b/components/core/src/glt/string_utils/CMakeLists.txt @@ -9,4 +9,4 @@ add_library( ) add_library(clp::string_utils ALIAS string_utils) target_include_directories(string_utils PUBLIC ../) -target_compile_features(string_utils PRIVATE cxx_std_17) +target_compile_features(string_utils PRIVATE cxx_std_20) diff --git a/components/core/src/reducer/BufferedSocketWriter.hpp b/components/core/src/reducer/BufferedSocketWriter.hpp index 56521fff6..4eea3c4f8 100644 --- a/components/core/src/reducer/BufferedSocketWriter.hpp +++ b/components/core/src/reducer/BufferedSocketWriter.hpp @@ -27,7 +27,7 @@ class BufferedSocketWriter { template < typename T, typename = std::enable_if_t< - std::is_pod::value + std::is_trivial_v && std::is_standard_layout_v && std::is_trivially_copyable_v && sizeof(T) == sizeof(char)>> bool write(std::vector const& data) { return write(reinterpret_cast(data.data()), data.size()); diff --git a/components/core/src/reducer/CMakeLists.txt b/components/core/src/reducer/CMakeLists.txt index e49749a41..16958da74 100644 --- a/components/core/src/reducer/CMakeLists.txt +++ b/components/core/src/reducer/CMakeLists.txt @@ -39,7 +39,7 @@ set( ) add_executable(reducer-server ${REDUCER_SOURCES}) -target_compile_features(reducer-server PRIVATE cxx_std_17) +target_compile_features(reducer-server PRIVATE cxx_std_20) target_include_directories(reducer-server PRIVATE "${PROJECT_SOURCE_DIR}/submodules") target_link_libraries(reducer-server PRIVATE diff --git a/docs/src/dev-guide/components-core/index.md b/docs/src/dev-guide/components-core/index.md index a78bd03af..0e9fd2623 100644 --- a/docs/src/dev-guide/components-core/index.md +++ b/docs/src/dev-guide/components-core/index.md @@ -6,7 +6,7 @@ CLP core is the low-level component that performs compression, decompression, an * We have built and tested CLP on the OSes listed [below](#native-environment). * If you have trouble building for another OS, file an issue, and we may be able to help. -* A compiler that supports C++17 and std::span (e.g., gcc-10) +* A compiler that supports C++20 and std::span (e.g., gcc-10) To build, we require some source dependencies, packages from package managers, and libraries built from source.