diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index cddcd9866..5b1af4d0a 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -2,6 +2,7 @@ #define DICTIONARYWRITER_HPP // C++ standard libraries +#include #include #include #include @@ -11,6 +12,7 @@ // Project headers #include "Defs.h" +#include "dictionary_utils.hpp" #include "FileWriter.hpp" #include "streaming_compression/zstd/Compressor.hpp" #include "TraceableException.hpp" @@ -38,7 +40,7 @@ class DictionaryWriter { // Constructors DictionaryWriter () : m_is_open(false) {} - ~DictionaryWriter (); + ~DictionaryWriter () = default; // Methods /** @@ -53,16 +55,17 @@ class DictionaryWriter { void close (); /** - * Gets the entry with the given ID - * @param id - * @return Pointer to the entry with the given ID + * Writes the dictionary's header and flushes unwritten content to disk */ - const EntryType* get_entry (DictionaryIdType id) const; + void write_header_and_flush_to_disk (); /** - * Writes uncommitted dictionary entries to file + * Opens dictionary, loads entries, and then sets it up for writing + * @param dictionary_path + * @param segment_index_path + * @param max_id */ - void write_uncommitted_entries_to_disk (); + void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, variable_dictionary_id_t max_id); /** * Adds the given segment and IDs to the segment index @@ -85,22 +88,19 @@ class DictionaryWriter { protected: // Types - typedef std::unordered_map value_to_entry_t; - typedef std::unordered_map id_to_entry_t; + typedef std::unordered_map value_to_id_t; // Variables bool m_is_open; // Variables related to on-disk storage - std::vector m_uncommitted_entries; FileWriter m_dictionary_file_writer; streaming_compression::zstd::Compressor m_dictionary_compressor; FileWriter m_segment_index_file_writer; streaming_compression::zstd::Compressor m_segment_index_compressor; size_t m_num_segments_in_index; - value_to_entry_t m_value_to_entry; - id_to_entry_t m_id_to_entry; + value_to_id_t m_value_to_id; DictionaryIdType m_next_id; DictionaryIdType m_max_id; @@ -108,16 +108,6 @@ class DictionaryWriter { size_t m_data_size; }; -template -DictionaryWriter::~DictionaryWriter () { - if (false == m_uncommitted_entries.empty()) { - SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss."); - } - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } -} - template void DictionaryWriter::open (const std::string& dictionary_path, const std::string& segment_index_path, DictionaryIdType max_id) { if (m_is_open) { @@ -151,60 +141,89 @@ void DictionaryWriter::close () { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - write_uncommitted_entries_to_disk(); + write_header_and_flush_to_disk(); m_segment_index_compressor.close(); m_segment_index_file_writer.close(); m_dictionary_compressor.close(); m_dictionary_file_writer.close(); - // Delete entries and clear maps - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } - m_id_to_entry.clear(); - m_value_to_entry.clear(); + m_value_to_id.clear(); m_is_open = false; } template -const EntryType* DictionaryWriter::get_entry (DictionaryIdType id) const { +void DictionaryWriter::write_header_and_flush_to_disk () { if (false == m_is_open) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - if (m_id_to_entry.count(id) == 0) { - throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); - } - return m_id_to_entry.at(id); + // Update header + auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos(); + m_dictionary_file_writer.seek_from_begin(0); + m_dictionary_file_writer.write_numeric_value(m_value_to_id.size()); + m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos); + + m_segment_index_compressor.flush(); + m_segment_index_file_writer.flush(); + m_dictionary_compressor.flush(); + m_dictionary_file_writer.flush(); } template -void DictionaryWriter::write_uncommitted_entries_to_disk () { - if (false == m_is_open) { - throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); +void DictionaryWriter::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, + const variable_dictionary_id_t max_id) +{ + if (m_is_open) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } - if (m_uncommitted_entries.empty()) { - // Nothing to do - return; + m_max_id = max_id; + + FileReader dictionary_file_reader; + streaming_compression::zstd::Decompressor dictionary_decompressor; + FileReader segment_index_file_reader; + streaming_compression::zstd::Decompressor segment_index_decompressor; + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, + segment_index_file_reader, segment_index_decompressor); + + auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); + if (num_dictionary_entries > m_max_id) { + SPDLOG_ERROR("DictionaryWriter ran out of IDs."); + throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); } + // Loads entries from the given dictionary file + EntryType entry; + for (size_t i = 0; i < num_dictionary_entries; ++i) { + entry.clear(); + entry.read_from_file(dictionary_decompressor); + const auto& str_value = entry.get_value(); + if (m_value_to_id.count(str_value)) { + SPDLOG_ERROR("Entry's value already exists in dictionary"); + throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); + } - for (auto entry : m_uncommitted_entries) { - entry->write_to_file(m_dictionary_compressor); + m_value_to_id[str_value] = entry.get_id();; + m_data_size += entry.get_data_size(); } - // Update header - auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos(); - m_dictionary_file_writer.seek_from_begin(0); - m_dictionary_file_writer.write_numeric_value(m_id_to_entry.size()); - m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos); + m_next_id = num_dictionary_entries; - m_segment_index_compressor.flush(); - m_segment_index_file_writer.flush(); - m_dictionary_compressor.flush(); - m_dictionary_file_writer.flush(); - m_uncommitted_entries.clear(); + segment_index_decompressor.close(); + segment_index_file_reader.close(); + dictionary_decompressor.close(); + dictionary_file_reader.close(); + + m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); + // Open compressor + m_dictionary_compressor.open(m_dictionary_file_writer); + + m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); + // Open compressor + m_segment_index_compressor.open(m_segment_index_file_writer); + + m_is_open = true; } template diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index 1e26fda7e..ff55250b6 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -203,7 +203,8 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia } void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry, - VariableDictionaryWriter& var_dict, vector& encoded_vars) + VariableDictionaryWriter& var_dict, vector& encoded_vars, + vector& var_ids) { // Extract all variables and add to dictionary while building logtype size_t tok_begin_pos = 0; @@ -223,8 +224,9 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes } else { // Variable string looks like a dictionary variable, so encode it as so variable_dictionary_id_t id; - var_dict.add_occurrence(var_str, id); + var_dict.add_entry(var_str, id); encoded_var = encode_var_dict_id(id); + var_ids.push_back(id); logtype_dict_entry.add_non_double_var(); } diff --git a/components/core/src/EncodedVariableInterpreter.hpp b/components/core/src/EncodedVariableInterpreter.hpp index 551f2bbc7..4c0cb6db1 100644 --- a/components/core/src/EncodedVariableInterpreter.hpp +++ b/components/core/src/EncodedVariableInterpreter.hpp @@ -75,9 +75,10 @@ class EncodedVariableInterpreter { * @param logtype_dict_entry * @param var_dict * @param encoded_vars + * @param var_ids */ static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, - std::vector& encoded_vars); + std::vector& encoded_vars, std::vector& var_ids); /** * Decodes all variables and decompresses them into a message * @param logtype_dict_entry diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index 6b01228e6..1960a1e46 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -5,62 +5,17 @@ using std::string; -void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, logtype_dictionary_id_t max_id) { - if (m_is_open) { - throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); - } - - m_max_id = max_id; - - FileReader dictionary_file_reader; - streaming_compression::zstd::Decompressor dictionary_decompressor; - FileReader segment_index_file_reader; - streaming_compression::zstd::Decompressor segment_index_decompressor; - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, - segment_index_file_reader, segment_index_decompressor); - - auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - - // Read new dictionary entries - logtype_dictionary_id_t id; - for (size_t i = 0; i < num_dictionary_entries; ++i) { - auto logtype_dict_entry_wrapper = std::make_unique(); - logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor); - add_occurrence(logtype_dict_entry_wrapper, id); - } - m_uncommitted_entries.clear(); - - segment_index_decompressor.close(); - segment_index_file_reader.close(); - dictionary_decompressor.close(); - dictionary_file_reader.close(); - - m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_dictionary_compressor.open(m_dictionary_file_writer); - - m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_segment_index_compressor.open(m_segment_index_file_writer); - - m_is_open = true; -} - -bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr& entry_wrapper, logtype_dictionary_id_t& logtype_id) { - if (nullptr == entry_wrapper) { - throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); - } - auto& entry = *entry_wrapper; - +bool LogTypeDictionaryWriter::add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { bool is_new_entry = false; - const string& value = entry.get_value(); - const auto ix = m_value_to_entry.find(value); - if (m_value_to_entry.end() != ix) { - // Entry exists so increment its count - auto& existing_entry = ix->second; - logtype_id = existing_entry->get_id(); + const string& value = logtype_entry.get_value(); + if (value.empty()) { + throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); + } + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + // Entry exists so get its ID + logtype_id = ix->second; } else { // Dictionary entry doesn't exist so create it @@ -81,25 +36,22 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrget_data_size(); + m_data_size += logtype_entry.get_data_size(); + + logtype_entry.write_to_file(m_dictionary_compressor); } return is_new_entry; } diff --git a/components/core/src/LogTypeDictionaryWriter.hpp b/components/core/src/LogTypeDictionaryWriter.hpp index 413fd5de5..55c5af44a 100644 --- a/components/core/src/LogTypeDictionaryWriter.hpp +++ b/components/core/src/LogTypeDictionaryWriter.hpp @@ -29,20 +29,11 @@ class LogTypeDictionaryWriter : public DictionaryWriter& entry_wrapper, logtype_dictionary_id_t& logtype_id); + bool add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id); }; #endif // LOGTYPEDICTIONARYWRITER_HPP diff --git a/components/core/src/VariableDictionaryEntry.hpp b/components/core/src/VariableDictionaryEntry.hpp index f480d99d7..f4d45276b 100644 --- a/components/core/src/VariableDictionaryEntry.hpp +++ b/components/core/src/VariableDictionaryEntry.hpp @@ -44,6 +44,8 @@ class VariableDictionaryEntry : public DictionaryEntry */ size_t get_data_size () const; + void clear () { m_value.clear(); } + /** * Writes an entry to file * @param compressor diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index 5a3e9ce41..227260f03 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -6,56 +6,12 @@ // Project headers #include "dictionary_utils.hpp" -using std::string; - -void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, const string& segment_index_path, variable_dictionary_id_t max_id) { - if (m_is_open) { - throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); - } - - m_max_id = max_id; - - FileReader dictionary_file_reader; - streaming_compression::zstd::Decompressor dictionary_decompressor; - FileReader segment_index_file_reader; - streaming_compression::zstd::Decompressor segment_index_decompressor; - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, - segment_index_file_reader, segment_index_decompressor); - - auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - - // Read new dictionary entries - variable_dictionary_id_t id; - VariableDictionaryEntry var_dict_entry; - for (size_t i = 0; i < num_dictionary_entries; ++i) { - var_dict_entry.read_from_file(dictionary_decompressor); - add_occurrence(var_dict_entry.get_value(), id); - } - m_uncommitted_entries.clear(); - - segment_index_decompressor.close(); - segment_index_file_reader.close(); - dictionary_decompressor.close(); - dictionary_file_reader.close(); - - m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_dictionary_compressor.open(m_dictionary_file_writer); - - m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_segment_index_compressor.open(m_segment_index_file_writer); - - m_is_open = true; -} - -bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) { +bool VariableDictionaryWriter::add_entry (const std::string& value, variable_dictionary_id_t& id) { bool new_entry = false; - const auto ix = m_value_to_entry.find(value); - if (m_value_to_entry.end() != ix) { - id = ix->second->get_id(); + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + id = ix->second; } else { // Entry doesn't exist so create it @@ -69,17 +25,15 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic ++m_next_id; // Insert the ID obtained from the database into the dictionary - auto* entry = new VariableDictionaryEntry(value, id); - m_value_to_entry[value] = entry; - m_id_to_entry[id] = entry; - - // Mark ID as dirty - m_uncommitted_entries.emplace_back(entry); + auto entry = VariableDictionaryEntry(value, id); + m_value_to_id[value] = id; new_entry = true; // TODO: This doesn't account for the segment index that's constantly updated - m_data_size += entry->get_data_size(); + m_data_size += entry.get_data_size(); + + entry.write_to_file(m_dictionary_compressor); } return new_entry; } diff --git a/components/core/src/VariableDictionaryWriter.hpp b/components/core/src/VariableDictionaryWriter.hpp index d5095703d..329d1faf7 100644 --- a/components/core/src/VariableDictionaryWriter.hpp +++ b/components/core/src/VariableDictionaryWriter.hpp @@ -23,21 +23,12 @@ class VariableDictionaryWriter : public DictionaryWriter(); - // Open variable dictionary string var_dict_path = archive_path_string + '/' + cVarDictFilename; string var_dict_segment_index_path = archive_path_string + '/' + cVarSegmentIndexFilename; @@ -209,7 +206,7 @@ namespace streaming_archive { namespace writer { write_dir_snapshot(); m_logtype_dict.close(); - m_logtype_dict_entry_wrapper.reset(nullptr); + m_logtype_dict_entry.clear(); m_var_dict.close(); if (::close(m_segments_dir_fd) != 0) { @@ -265,13 +262,12 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars); + vector var_ids; + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, m_logtype_dict_entry, m_var_dict, encoded_vars, var_ids); logtype_dictionary_id_t logtype_id; - if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) { - m_logtype_dict_entry_wrapper = make_unique(); - } + m_logtype_dict.add_entry(m_logtype_dict_entry, logtype_id); - file.write_encoded_msg(timestamp, logtype_id, encoded_vars, num_uncompressed_bytes); + file.write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); } void Archive::write_dir_snapshot () { @@ -284,8 +280,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_header_and_flush_to_disk(); + m_var_dict.write_header_and_flush_to_disk(); } void Archive::append_file_to_segment (File*& file, Segment& segment, unordered_set& logtype_ids_in_segment, @@ -369,8 +365,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_header_and_flush_to_disk(); + m_var_dict.write_header_and_flush_to_disk(); for (auto file : files) { file->mark_as_in_committed_segment(); diff --git a/components/core/src/streaming_archive/writer/Archive.hpp b/components/core/src/streaming_archive/writer/Archive.hpp index 1ec5c6d68..6443cb997 100644 --- a/components/core/src/streaming_archive/writer/Archive.hpp +++ b/components/core/src/streaming_archive/writer/Archive.hpp @@ -243,8 +243,8 @@ namespace streaming_archive { namespace writer { int m_segments_dir_fd; LogTypeDictionaryWriter m_logtype_dict; - // Wrapper to hold logtype dictionary entry that's preallocated for performance - std::unique_ptr m_logtype_dict_entry_wrapper; + // Holds preallocated logtype dictionary entry for performance + LogTypeDictionaryEntry m_logtype_dict_entry; VariableDictionaryWriter m_var_dict; boost::uuids::random_generator m_uuid_generator; diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index 3477a5886..2480f9616 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -6,13 +6,14 @@ using std::string; using std::to_string; using std::unordered_set; +using std::vector; namespace streaming_archive { namespace writer { void File::open () { if (m_is_written_out) { throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); } - + m_variable_ids = std::make_unique>(); m_is_open = true; } @@ -25,9 +26,7 @@ namespace streaming_archive { namespace writer { // Add file's logtype and variable IDs to respective segment sets auto logtype_ids = m_logtypes.data(); - auto variables = m_variables.data(); - append_logtype_and_var_ids_to_segment_sets(logtype_dict, logtype_ids, m_logtypes.size(), variables, m_variables.size(), segment_logtype_ids, - segment_var_ids); + append_logtype_and_var_ids_to_segment_sets(logtype_ids, m_logtypes.size(), segment_logtype_ids, segment_var_ids); // Append files to segment uint64_t segment_timestamps_uncompressed_pos; @@ -39,20 +38,24 @@ namespace streaming_archive { namespace writer { set_segment_metadata(segment.get_id(), segment_timestamps_uncompressed_pos, segment_logtypes_uncompressed_pos, segment_variables_uncompressed_pos); m_segmentation_state = SegmentationState_MovingToSegment; - // Mark file as written out and clear in-memory columns + // Mark file as written out and clear in-memory columns and clear the in-memory data (except metadata) m_is_written_out = true; m_timestamps.clear(); m_logtypes.clear(); m_variables.clear(); + m_variable_ids.reset(nullptr); } - void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes) + void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const vector& encoded_vars, + const vector& var_ids, size_t num_uncompressed_bytes) { m_timestamps.push_back(timestamp); m_logtypes.push_back(logtype_id); m_variables.push_back_all(encoded_vars); + // Insert message's variable IDs into the file's variable ID set + m_variable_ids->insert(var_ids.cbegin(), var_ids.cend()); + // Update metadata ++m_num_messages; m_num_variables += encoded_vars.size(); @@ -112,37 +115,17 @@ namespace streaming_archive { namespace writer { return encoded_timestamp_patterns; } - void File::append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, - size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, + void File::append_logtype_and_var_ids_to_segment_sets (const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, unordered_set& segment_logtype_ids, unordered_set& segment_var_ids) { - size_t var_ix = 0; + // Add logtype IDs for (size_t i = 0; i < num_logtypes; ++i) { - // Add logtype to set auto logtype_id = logtype_ids[i]; segment_logtype_ids.emplace(logtype_id); - - // Get logtype dictionary entry - auto logtype_dict_entry_ptr = logtype_dict.get_entry(logtype_id); - auto& logtype_dict_entry = *logtype_dict_entry_ptr; - - // Get number of variables in logtype - auto msg_num_vars = logtype_dict_entry.get_num_vars(); - if (var_ix + msg_num_vars > num_vars) { - throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); - } - - // If variable is a variable dictionary ID, decode it and add it to the set - for (size_t msg_var_ix = 0; msg_var_ix < msg_num_vars; ++msg_var_ix, ++var_ix) { - if (LogTypeDictionaryEntry::VarDelim::NonDouble == logtype_dict_entry.get_var_delim(msg_var_ix)) { - auto var = vars[var_ix]; - if (EncodedVariableInterpreter::is_var_dict_id(var)) { - segment_var_ids.emplace(EncodedVariableInterpreter::decode_var_dict_id(var)); - } - } - } } + // Add variable IDs + segment_var_ids.insert(m_variable_ids->cbegin(), m_variable_ids->cend()); } void File::set_segment_metadata (segment_id_t segment_id, uint64_t segment_timestamps_uncompressed_pos, uint64_t segment_logtypes_uncompressed_pos, diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index 8b511eefd..a21b6b824 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -54,6 +54,7 @@ namespace streaming_archive { namespace writer { m_segment_variables_pos(0), m_is_split(split_ix > 0), m_split_ix(split_ix), + m_variable_ids(nullptr), m_segmentation_state(SegmentationState_NotInSegment), m_is_metadata_clean(false), m_is_written_out(false), @@ -81,10 +82,11 @@ namespace streaming_archive { namespace writer { * @param timestamp * @param logtype_id * @param encoded_vars + * @param var_ids * @param num_uncompressed_bytes */ void write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes); + const std::vector& var_ids, size_t num_uncompressed_bytes); /** * Changes timestamp pattern in use at current message in file @@ -174,18 +176,14 @@ namespace streaming_archive { namespace writer { // Methods /** * Takes logtype and variable IDs from a file's logtype and variable columns and appends them to the given sets - * @param logtype_dict * @param logtype_ids * @param num_logtypes - * @param vars - * @param num_vars * @param segment_logtype_ids * @param segment_var_ids */ - static void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, - size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, - std::unordered_set& segment_logtype_ids, - std::unordered_set& segment_var_ids); + void append_logtype_and_var_ids_to_segment_sets (const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, + std::unordered_set& segment_logtype_ids, + std::unordered_set& segment_var_ids); /** * Sets segment-related metadata to the given values @@ -229,6 +227,7 @@ namespace streaming_archive { namespace writer { PageAllocatedVector m_timestamps; PageAllocatedVector m_logtypes; PageAllocatedVector m_variables; + std::unique_ptr> m_variable_ids; // State variables SegmentationState m_segmentation_state; diff --git a/components/core/tests/test-EncodedVariableInterpreter.cpp b/components/core/tests/test-EncodedVariableInterpreter.cpp index 1790ffd43..35d9d5ae0 100644 --- a/components/core/tests/test-EncodedVariableInterpreter.cpp +++ b/components/core/tests/test-EncodedVariableInterpreter.cpp @@ -241,14 +241,26 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { // Test encoding vector encoded_vars; - vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00"}; + vector var_ids; + vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), + "-25.5196868642755", "-00.00", "bin/python2.7.3"}; msg = "here is a string with a small int " + var_strs[0] + " and a very large int " + var_strs[1] + " and a double " + var_strs[2] + - " and a weird double " + var_strs[3]; + " and a weird double " + var_strs[3] + " and a str with numbers " + var_strs[4]; LogTypeDictionaryEntry logtype_dict_entry; - EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars); - + EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars, var_ids); var_dict_writer.close(); + // Test var_ids is correctly populated + size_t encoded_var_id_ix = 0; + for (const auto& var : encoded_vars) { + if(EncodedVariableInterpreter::is_var_dict_id(var)){ + REQUIRE(var_ids.size() > encoded_var_id_ix); + REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == var_ids[encoded_var_id_ix]); + encoded_var_id_ix++; + } + } + REQUIRE(var_ids.size() == encoded_var_id_ix); + // Open reader VariableDictionaryReader var_dict_reader; var_dict_reader.open(cVarDictPath, cVarSegmentIndexPath); @@ -264,6 +276,8 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[2], var_dict_reader, false, search_logtype, sub_query)); search_logtype += " and a weird double "; REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[3], var_dict_reader, false, search_logtype, sub_query)); + search_logtype += " and a str with numbers "; + REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[4], var_dict_reader, false, search_logtype, sub_query)); auto& vars = sub_query.get_vars(); REQUIRE(vars.size() == encoded_vars.size()); for (size_t i = 0; i < vars.size(); ++i) {