diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index cddcd9866a..61b3e07eba 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -60,9 +60,9 @@ class DictionaryWriter { const EntryType* get_entry (DictionaryIdType id) const; /** - * Writes uncommitted dictionary entries to file + * Writes dict_type count and flush compressors to file */ - void write_uncommitted_entries_to_disk (); + void write_dictionary_info_to_disk (); /** * Adds the given segment and IDs to the segment index @@ -85,22 +85,19 @@ class DictionaryWriter { protected: // Types - typedef std::unordered_map value_to_entry_t; - typedef std::unordered_map id_to_entry_t; + typedef std::unordered_map value_to_id_t; // Variables bool m_is_open; // Variables related to on-disk storage - std::vector m_uncommitted_entries; FileWriter m_dictionary_file_writer; streaming_compression::zstd::Compressor m_dictionary_compressor; FileWriter m_segment_index_file_writer; streaming_compression::zstd::Compressor m_segment_index_compressor; size_t m_num_segments_in_index; - value_to_entry_t m_value_to_entry; - id_to_entry_t m_id_to_entry; + value_to_id_t m_value_to_id; DictionaryIdType m_next_id; DictionaryIdType m_max_id; @@ -110,12 +107,7 @@ class DictionaryWriter { template DictionaryWriter::~DictionaryWriter () { - if (false == m_uncommitted_entries.empty()) { - SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss."); - } - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } + // Do nothing } template @@ -151,60 +143,34 @@ void DictionaryWriter::close () { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - write_uncommitted_entries_to_disk(); + write_dictionary_info_to_disk(); m_segment_index_compressor.close(); m_segment_index_file_writer.close(); m_dictionary_compressor.close(); m_dictionary_file_writer.close(); - // Delete entries and clear maps - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } - m_id_to_entry.clear(); - m_value_to_entry.clear(); + // clear maps + m_value_to_id.clear(); m_is_open = false; } template -const EntryType* DictionaryWriter::get_entry (DictionaryIdType id) const { +void DictionaryWriter::write_dictionary_info_to_disk () { if (false == m_is_open) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - if (m_id_to_entry.count(id) == 0) { - throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); - } - return m_id_to_entry.at(id); -} - -template -void DictionaryWriter::write_uncommitted_entries_to_disk () { - if (false == m_is_open) { - throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); - } - - if (m_uncommitted_entries.empty()) { - // Nothing to do - return; - } - - for (auto entry : m_uncommitted_entries) { - entry->write_to_file(m_dictionary_compressor); - } - // Update header auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos(); m_dictionary_file_writer.seek_from_begin(0); - m_dictionary_file_writer.write_numeric_value(m_id_to_entry.size()); + m_dictionary_file_writer.write_numeric_value(m_value_to_id.size()); m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos); m_segment_index_compressor.flush(); m_segment_index_file_writer.flush(); m_dictionary_compressor.flush(); m_dictionary_file_writer.flush(); - m_uncommitted_entries.clear(); } template diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index 1e26fda7e3..fbf41be5d0 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -203,7 +203,8 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia } void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry, - VariableDictionaryWriter& var_dict, vector& encoded_vars) + VariableDictionaryWriter& var_dict, vector& encoded_vars, + vector& added_vars) { // Extract all variables and add to dictionary while building logtype size_t tok_begin_pos = 0; @@ -225,6 +226,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes variable_dictionary_id_t id; var_dict.add_occurrence(var_str, id); encoded_var = encode_var_dict_id(id); + added_vars.push_back(id); logtype_dict_entry.add_non_double_var(); } diff --git a/components/core/src/EncodedVariableInterpreter.hpp b/components/core/src/EncodedVariableInterpreter.hpp index 551f2bbc79..b2725394b9 100644 --- a/components/core/src/EncodedVariableInterpreter.hpp +++ b/components/core/src/EncodedVariableInterpreter.hpp @@ -77,7 +77,7 @@ class EncodedVariableInterpreter { * @param encoded_vars */ static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, - std::vector& encoded_vars); + std::vector& encoded_vars, std::vector& added_vars); /** * Decodes all variables and decompresses them into a message * @param logtype_dict_entry diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index 6b01228e69..d9de8f8e8b 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -29,7 +29,6 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor); add_occurrence(logtype_dict_entry_wrapper, id); } - m_uncommitted_entries.clear(); segment_index_decompressor.close(); segment_index_file_reader.close(); @@ -56,11 +55,10 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrsecond; - logtype_id = existing_entry->get_id(); + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + // Entry exists so get its ID + logtype_id = ix->second; } else { // Dictionary entry doesn't exist so create it @@ -90,16 +88,17 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrget_data_size(); + + // write the new entry to the compressor and free up the memory + entry_ptr->write_to_file(m_dictionary_compressor); + delete entry_ptr; } return is_new_entry; } diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index 5a3e9ce41b..e8ef5a4ed9 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -32,7 +32,6 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, var_dict_entry.read_from_file(dictionary_decompressor); add_occurrence(var_dict_entry.get_value(), id); } - m_uncommitted_entries.clear(); segment_index_decompressor.close(); segment_index_file_reader.close(); @@ -53,9 +52,9 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) { bool new_entry = false; - const auto ix = m_value_to_entry.find(value); - if (m_value_to_entry.end() != ix) { - id = ix->second->get_id(); + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + id = ix->second; } else { // Entry doesn't exist so create it @@ -70,16 +69,17 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic // Insert the ID obtained from the database into the dictionary auto* entry = new VariableDictionaryEntry(value, id); - m_value_to_entry[value] = entry; - m_id_to_entry[id] = entry; - - // Mark ID as dirty - m_uncommitted_entries.emplace_back(entry); + m_value_to_id[value] = id; new_entry = true; // TODO: This doesn't account for the segment index that's constantly updated + // Nor does it account that compression doesn't store the whole entry in the memory m_data_size += entry->get_data_size(); + + // write the new entry to the compressor and free up the memory + entry->write_to_file(m_dictionary_compressor); + delete entry; } return new_entry; } diff --git a/components/core/src/streaming_archive/writer/Archive.cpp b/components/core/src/streaming_archive/writer/Archive.cpp index d32f9de03b..25f96062fb 100644 --- a/components/core/src/streaming_archive/writer/Archive.cpp +++ b/components/core/src/streaming_archive/writer/Archive.cpp @@ -265,13 +265,15 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars); + // ids of variable type in the message + vector added_var_ids; + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, added_var_ids); logtype_dictionary_id_t logtype_id; if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) { m_logtype_dict_entry_wrapper = make_unique(); } - file.write_encoded_msg(timestamp, logtype_id, encoded_vars, num_uncompressed_bytes); + file.write_encoded_msg(timestamp, logtype_id, encoded_vars, added_var_ids, num_uncompressed_bytes); } void Archive::write_dir_snapshot () { @@ -284,8 +286,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_dictionary_info_to_disk(); + m_var_dict.write_dictionary_info_to_disk(); } void Archive::append_file_to_segment (File*& file, Segment& segment, unordered_set& logtype_ids_in_segment, @@ -369,8 +371,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_dictionary_info_to_disk(); + m_var_dict.write_dictionary_info_to_disk(); for (auto file : files) { file->mark_as_in_committed_segment(); diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index 3477a5886a..ea326ecd8c 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -12,7 +12,7 @@ namespace streaming_archive { namespace writer { if (m_is_written_out) { throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); } - + m_variable_ids = new std::unordered_set; m_is_open = true; } @@ -44,15 +44,26 @@ namespace streaming_archive { namespace writer { m_timestamps.clear(); m_logtypes.clear(); m_variables.clear(); + + // release the memory + m_variable_ids->clear(); + delete m_variable_ids; + m_variable_ids = nullptr; } void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes) + const std::vector& added_var_ids, size_t num_uncompressed_bytes) { m_timestamps.push_back(timestamp); m_logtypes.push_back(logtype_id); m_variables.push_back_all(encoded_vars); + // insert ids seen from log message to the file + // duplicate ids will be handled by unordered_set internally + for(const auto& id : added_var_ids){ + m_variable_ids->emplace(id); + } + // Update metadata ++m_num_messages; m_num_variables += encoded_vars.size(); @@ -117,31 +128,14 @@ namespace streaming_archive { namespace writer { unordered_set& segment_logtype_ids, unordered_set& segment_var_ids) { - size_t var_ix = 0; + // Add logtype to set for (size_t i = 0; i < num_logtypes; ++i) { - // Add logtype to set auto logtype_id = logtype_ids[i]; segment_logtype_ids.emplace(logtype_id); - - // Get logtype dictionary entry - auto logtype_dict_entry_ptr = logtype_dict.get_entry(logtype_id); - auto& logtype_dict_entry = *logtype_dict_entry_ptr; - - // Get number of variables in logtype - auto msg_num_vars = logtype_dict_entry.get_num_vars(); - if (var_ix + msg_num_vars > num_vars) { - throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); - } - - // If variable is a variable dictionary ID, decode it and add it to the set - for (size_t msg_var_ix = 0; msg_var_ix < msg_num_vars; ++msg_var_ix, ++var_ix) { - if (LogTypeDictionaryEntry::VarDelim::NonDouble == logtype_dict_entry.get_var_delim(msg_var_ix)) { - auto var = vars[var_ix]; - if (EncodedVariableInterpreter::is_var_dict_id(var)) { - segment_var_ids.emplace(EncodedVariableInterpreter::decode_var_dict_id(var)); - } - } - } + } + // Add vartype to set + for(const variable_dictionary_id_t& id : *m_variable_ids){ + segment_var_ids.emplace(id); } } diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index 8b511eefd1..5b9a9b4e54 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -57,7 +57,8 @@ namespace streaming_archive { namespace writer { m_segmentation_state(SegmentationState_NotInSegment), m_is_metadata_clean(false), m_is_written_out(false), - m_is_open(false) + m_is_open(false), + m_variable_ids(nullptr) {} // Destructor @@ -84,7 +85,7 @@ namespace streaming_archive { namespace writer { * @param num_uncompressed_bytes */ void write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes); + const std::vector& added_vars_ids, size_t num_uncompressed_bytes); /** * Changes timestamp pattern in use at current message in file @@ -182,7 +183,7 @@ namespace streaming_archive { namespace writer { * @param segment_logtype_ids * @param segment_var_ids */ - static void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, + void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, std::unordered_set& segment_logtype_ids, std::unordered_set& segment_var_ids); @@ -229,6 +230,7 @@ namespace streaming_archive { namespace writer { PageAllocatedVector m_timestamps; PageAllocatedVector m_logtypes; PageAllocatedVector m_variables; + std::unordered_set * m_variable_ids; // State variables SegmentationState m_segmentation_state;