From fa435ab5f3f74951694c85112e0d36453c7d5347 Mon Sep 17 00:00:00 2001 From: Haiqi Date: Fri, 26 Nov 2021 13:23:52 -0500 Subject: [PATCH 01/13] 1.Remove uncommited entries from memory during compression. 2.Updated how ids_in_segment gets populated --- components/core/src/DictionaryWriter.hpp | 61 +++---------------- .../core/src/EncodedVariableInterpreter.cpp | 4 +- .../core/src/EncodedVariableInterpreter.hpp | 2 +- .../core/src/LogTypeDictionaryWriter.cpp | 21 +++---- .../core/src/VariableDictionaryWriter.cpp | 18 +++--- .../src/streaming_archive/writer/Archive.cpp | 14 +++-- .../src/streaming_archive/writer/File.cpp | 42 ++++++------- .../src/streaming_archive/writer/File.hpp | 8 ++- 8 files changed, 64 insertions(+), 106 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index cddcd9866..3ad9b0483 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -53,16 +53,9 @@ class DictionaryWriter { void close (); /** - * Gets the entry with the given ID - * @param id - * @return Pointer to the entry with the given ID + * Writes dict_type count and flush compressors to file */ - const EntryType* get_entry (DictionaryIdType id) const; - - /** - * Writes uncommitted dictionary entries to file - */ - void write_uncommitted_entries_to_disk (); + void write_dictionary_info_to_disk (); /** * Adds the given segment and IDs to the segment index @@ -85,22 +78,19 @@ class DictionaryWriter { protected: // Types - typedef std::unordered_map value_to_entry_t; - typedef std::unordered_map id_to_entry_t; + typedef std::unordered_map value_to_id_t; // Variables bool m_is_open; // Variables related to on-disk storage - std::vector m_uncommitted_entries; FileWriter m_dictionary_file_writer; streaming_compression::zstd::Compressor m_dictionary_compressor; FileWriter m_segment_index_file_writer; streaming_compression::zstd::Compressor m_segment_index_compressor; size_t m_num_segments_in_index; - value_to_entry_t m_value_to_entry; - id_to_entry_t m_id_to_entry; + value_to_id_t m_value_to_id; DictionaryIdType m_next_id; DictionaryIdType m_max_id; @@ -110,12 +100,7 @@ class DictionaryWriter { template DictionaryWriter::~DictionaryWriter () { - if (false == m_uncommitted_entries.empty()) { - SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss."); - } - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } + // Do nothing } template @@ -151,60 +136,34 @@ void DictionaryWriter::close () { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - write_uncommitted_entries_to_disk(); + write_dictionary_info_to_disk(); m_segment_index_compressor.close(); m_segment_index_file_writer.close(); m_dictionary_compressor.close(); m_dictionary_file_writer.close(); - // Delete entries and clear maps - for (const auto& value_entry_pair : m_value_to_entry) { - delete value_entry_pair.second; - } - m_id_to_entry.clear(); - m_value_to_entry.clear(); + // clear maps + m_value_to_id.clear(); m_is_open = false; } template -const EntryType* DictionaryWriter::get_entry (DictionaryIdType id) const { - if (false == m_is_open) { - throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); - } - - if (m_id_to_entry.count(id) == 0) { - throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); - } - return m_id_to_entry.at(id); -} - -template -void DictionaryWriter::write_uncommitted_entries_to_disk () { +void DictionaryWriter::write_dictionary_info_to_disk () { if (false == m_is_open) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - if (m_uncommitted_entries.empty()) { - // Nothing to do - return; - } - - for (auto entry : m_uncommitted_entries) { - entry->write_to_file(m_dictionary_compressor); - } - // Update header auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos(); m_dictionary_file_writer.seek_from_begin(0); - m_dictionary_file_writer.write_numeric_value(m_id_to_entry.size()); + m_dictionary_file_writer.write_numeric_value(m_value_to_id.size()); m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos); m_segment_index_compressor.flush(); m_segment_index_file_writer.flush(); m_dictionary_compressor.flush(); m_dictionary_file_writer.flush(); - m_uncommitted_entries.clear(); } template diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index 1e26fda7e..fbf41be5d 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -203,7 +203,8 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia } void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry, - VariableDictionaryWriter& var_dict, vector& encoded_vars) + VariableDictionaryWriter& var_dict, vector& encoded_vars, + vector& added_vars) { // Extract all variables and add to dictionary while building logtype size_t tok_begin_pos = 0; @@ -225,6 +226,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes variable_dictionary_id_t id; var_dict.add_occurrence(var_str, id); encoded_var = encode_var_dict_id(id); + added_vars.push_back(id); logtype_dict_entry.add_non_double_var(); } diff --git a/components/core/src/EncodedVariableInterpreter.hpp b/components/core/src/EncodedVariableInterpreter.hpp index 551f2bbc7..b2725394b 100644 --- a/components/core/src/EncodedVariableInterpreter.hpp +++ b/components/core/src/EncodedVariableInterpreter.hpp @@ -77,7 +77,7 @@ class EncodedVariableInterpreter { * @param encoded_vars */ static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, - std::vector& encoded_vars); + std::vector& encoded_vars, std::vector& added_vars); /** * Decodes all variables and decompresses them into a message * @param logtype_dict_entry diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index 6b01228e6..d9de8f8e8 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -29,7 +29,6 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor); add_occurrence(logtype_dict_entry_wrapper, id); } - m_uncommitted_entries.clear(); segment_index_decompressor.close(); segment_index_file_reader.close(); @@ -56,11 +55,10 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrsecond; - logtype_id = existing_entry->get_id(); + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + // Entry exists so get its ID + logtype_id = ix->second; } else { // Dictionary entry doesn't exist so create it @@ -90,16 +88,17 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrget_data_size(); + + // write the new entry to the compressor and free up the memory + entry_ptr->write_to_file(m_dictionary_compressor); + delete entry_ptr; } return is_new_entry; } diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index 5a3e9ce41..e8ef5a4ed 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -32,7 +32,6 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, var_dict_entry.read_from_file(dictionary_decompressor); add_occurrence(var_dict_entry.get_value(), id); } - m_uncommitted_entries.clear(); segment_index_decompressor.close(); segment_index_file_reader.close(); @@ -53,9 +52,9 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) { bool new_entry = false; - const auto ix = m_value_to_entry.find(value); - if (m_value_to_entry.end() != ix) { - id = ix->second->get_id(); + const auto ix = m_value_to_id.find(value); + if (m_value_to_id.end() != ix) { + id = ix->second; } else { // Entry doesn't exist so create it @@ -70,16 +69,17 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic // Insert the ID obtained from the database into the dictionary auto* entry = new VariableDictionaryEntry(value, id); - m_value_to_entry[value] = entry; - m_id_to_entry[id] = entry; - - // Mark ID as dirty - m_uncommitted_entries.emplace_back(entry); + m_value_to_id[value] = id; new_entry = true; // TODO: This doesn't account for the segment index that's constantly updated + // Nor does it account that compression doesn't store the whole entry in the memory m_data_size += entry->get_data_size(); + + // write the new entry to the compressor and free up the memory + entry->write_to_file(m_dictionary_compressor); + delete entry; } return new_entry; } diff --git a/components/core/src/streaming_archive/writer/Archive.cpp b/components/core/src/streaming_archive/writer/Archive.cpp index d32f9de03..25f96062f 100644 --- a/components/core/src/streaming_archive/writer/Archive.cpp +++ b/components/core/src/streaming_archive/writer/Archive.cpp @@ -265,13 +265,15 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars); + // ids of variable type in the message + vector added_var_ids; + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, added_var_ids); logtype_dictionary_id_t logtype_id; if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) { m_logtype_dict_entry_wrapper = make_unique(); } - file.write_encoded_msg(timestamp, logtype_id, encoded_vars, num_uncompressed_bytes); + file.write_encoded_msg(timestamp, logtype_id, encoded_vars, added_var_ids, num_uncompressed_bytes); } void Archive::write_dir_snapshot () { @@ -284,8 +286,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_dictionary_info_to_disk(); + m_var_dict.write_dictionary_info_to_disk(); } void Archive::append_file_to_segment (File*& file, Segment& segment, unordered_set& logtype_ids_in_segment, @@ -369,8 +371,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_uncommitted_entries_to_disk(); - m_var_dict.write_uncommitted_entries_to_disk(); + m_logtype_dict.write_dictionary_info_to_disk(); + m_var_dict.write_dictionary_info_to_disk(); for (auto file : files) { file->mark_as_in_committed_segment(); diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index 3477a5886..ea326ecd8 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -12,7 +12,7 @@ namespace streaming_archive { namespace writer { if (m_is_written_out) { throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); } - + m_variable_ids = new std::unordered_set; m_is_open = true; } @@ -44,15 +44,26 @@ namespace streaming_archive { namespace writer { m_timestamps.clear(); m_logtypes.clear(); m_variables.clear(); + + // release the memory + m_variable_ids->clear(); + delete m_variable_ids; + m_variable_ids = nullptr; } void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes) + const std::vector& added_var_ids, size_t num_uncompressed_bytes) { m_timestamps.push_back(timestamp); m_logtypes.push_back(logtype_id); m_variables.push_back_all(encoded_vars); + // insert ids seen from log message to the file + // duplicate ids will be handled by unordered_set internally + for(const auto& id : added_var_ids){ + m_variable_ids->emplace(id); + } + // Update metadata ++m_num_messages; m_num_variables += encoded_vars.size(); @@ -117,31 +128,14 @@ namespace streaming_archive { namespace writer { unordered_set& segment_logtype_ids, unordered_set& segment_var_ids) { - size_t var_ix = 0; + // Add logtype to set for (size_t i = 0; i < num_logtypes; ++i) { - // Add logtype to set auto logtype_id = logtype_ids[i]; segment_logtype_ids.emplace(logtype_id); - - // Get logtype dictionary entry - auto logtype_dict_entry_ptr = logtype_dict.get_entry(logtype_id); - auto& logtype_dict_entry = *logtype_dict_entry_ptr; - - // Get number of variables in logtype - auto msg_num_vars = logtype_dict_entry.get_num_vars(); - if (var_ix + msg_num_vars > num_vars) { - throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); - } - - // If variable is a variable dictionary ID, decode it and add it to the set - for (size_t msg_var_ix = 0; msg_var_ix < msg_num_vars; ++msg_var_ix, ++var_ix) { - if (LogTypeDictionaryEntry::VarDelim::NonDouble == logtype_dict_entry.get_var_delim(msg_var_ix)) { - auto var = vars[var_ix]; - if (EncodedVariableInterpreter::is_var_dict_id(var)) { - segment_var_ids.emplace(EncodedVariableInterpreter::decode_var_dict_id(var)); - } - } - } + } + // Add vartype to set + for(const variable_dictionary_id_t& id : *m_variable_ids){ + segment_var_ids.emplace(id); } } diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index 8b511eefd..5b9a9b4e5 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -57,7 +57,8 @@ namespace streaming_archive { namespace writer { m_segmentation_state(SegmentationState_NotInSegment), m_is_metadata_clean(false), m_is_written_out(false), - m_is_open(false) + m_is_open(false), + m_variable_ids(nullptr) {} // Destructor @@ -84,7 +85,7 @@ namespace streaming_archive { namespace writer { * @param num_uncompressed_bytes */ void write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - size_t num_uncompressed_bytes); + const std::vector& added_vars_ids, size_t num_uncompressed_bytes); /** * Changes timestamp pattern in use at current message in file @@ -182,7 +183,7 @@ namespace streaming_archive { namespace writer { * @param segment_logtype_ids * @param segment_var_ids */ - static void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, + void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, std::unordered_set& segment_logtype_ids, std::unordered_set& segment_var_ids); @@ -229,6 +230,7 @@ namespace streaming_archive { namespace writer { PageAllocatedVector m_timestamps; PageAllocatedVector m_logtypes; PageAllocatedVector m_variables; + std::unordered_set * m_variable_ids; // State variables SegmentationState m_segmentation_state; From c34dcfab536cc508067dcd3a240e3060558cff60 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 16:40:49 -0500 Subject: [PATCH 02/13] Updated unit-test --- .../core/tests/test-EncodedVariableInterpreter.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/components/core/tests/test-EncodedVariableInterpreter.cpp b/components/core/tests/test-EncodedVariableInterpreter.cpp index 1790ffd43..cea8a7908 100644 --- a/components/core/tests/test-EncodedVariableInterpreter.cpp +++ b/components/core/tests/test-EncodedVariableInterpreter.cpp @@ -241,14 +241,23 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { // Test encoding vector encoded_vars; + vector added_var_ids; vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00"}; msg = "here is a string with a small int " + var_strs[0] + " and a very large int " + var_strs[1] + " and a double " + var_strs[2] + " and a weird double " + var_strs[3]; LogTypeDictionaryEntry logtype_dict_entry; - EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars); - + EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars, added_var_ids); var_dict_writer.close(); + // Test added_var_ids is correctly populated + size_t encoded_var_id_count = 0; + for(const auto& var : encoded_vars) { + if(EncodedVariableInterpreter::is_var_dict_id(var)){ + REQUIRE(added_var_ids.size() > encoded_var_id_count); + REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == added_var_ids[encoded_var_id_count]); + } + } + // Open reader VariableDictionaryReader var_dict_reader; var_dict_reader.open(cVarDictPath, cVarSegmentIndexPath); From 667251e1d5a4af21ce1831a576c4f5a7490b69af Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 16:51:23 -0500 Subject: [PATCH 03/13] Fix a mistake in the test and added a new variable string that will be added to the added_var_ids vector so as to improve testing coverage --- components/core/tests/test-EncodedVariableInterpreter.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/components/core/tests/test-EncodedVariableInterpreter.cpp b/components/core/tests/test-EncodedVariableInterpreter.cpp index cea8a7908..74ebc4dda 100644 --- a/components/core/tests/test-EncodedVariableInterpreter.cpp +++ b/components/core/tests/test-EncodedVariableInterpreter.cpp @@ -242,9 +242,9 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { // Test encoding vector encoded_vars; vector added_var_ids; - vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00"}; + vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00", "bin/python2.7.3"}; msg = "here is a string with a small int " + var_strs[0] + " and a very large int " + var_strs[1] + " and a double " + var_strs[2] + - " and a weird double " + var_strs[3]; + " and a weird double " + var_strs[3] + " and a str with numbers " + var_strs[4]; LogTypeDictionaryEntry logtype_dict_entry; EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars, added_var_ids); var_dict_writer.close(); @@ -255,8 +255,10 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { if(EncodedVariableInterpreter::is_var_dict_id(var)){ REQUIRE(added_var_ids.size() > encoded_var_id_count); REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == added_var_ids[encoded_var_id_count]); + encoded_var_id_count++; } } + REQUIRE(added_var_ids.size() == encoded_var_id_count); // Open reader VariableDictionaryReader var_dict_reader; @@ -273,6 +275,8 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[2], var_dict_reader, false, search_logtype, sub_query)); search_logtype += " and a weird double "; REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[3], var_dict_reader, false, search_logtype, sub_query)); + search_logtype += " and a str with numbers "; + REQUIRE(EncodedVariableInterpreter::encode_and_search_dictionary(var_strs[4], var_dict_reader, false, search_logtype, sub_query)); auto& vars = sub_query.get_vars(); REQUIRE(vars.size() == encoded_vars.size()); for (size_t i = 0; i < vars.size(); ++i) { From 6a74d3c09c957a448a6c8273ed80d1a98e948e04 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 21:45:43 -0500 Subject: [PATCH 04/13] address some issues that are easier to fix in the pull request review --- components/core/src/DictionaryWriter.hpp | 16 +++++----------- .../core/src/EncodedVariableInterpreter.hpp | 3 ++- components/core/src/LogTypeDictionaryWriter.cpp | 2 -- components/core/src/VariableDictionaryWriter.cpp | 1 - .../src/streaming_archive/writer/Archive.cpp | 15 +++++++-------- .../core/src/streaming_archive/writer/File.cpp | 6 ++---- .../core/src/streaming_archive/writer/File.hpp | 15 ++++++++------- .../tests/test-EncodedVariableInterpreter.cpp | 15 ++++++++------- 8 files changed, 32 insertions(+), 41 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index 3ad9b0483..0e93ef93c 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -38,7 +38,7 @@ class DictionaryWriter { // Constructors DictionaryWriter () : m_is_open(false) {} - ~DictionaryWriter (); + ~DictionaryWriter () = default; // Methods /** @@ -53,9 +53,9 @@ class DictionaryWriter { void close (); /** - * Writes dict_type count and flush compressors to file + * Writes the dictionary's header and flushes unwritten content to disk */ - void write_dictionary_info_to_disk (); + void write_header_and_flush_to_disk (); /** * Adds the given segment and IDs to the segment index @@ -98,11 +98,6 @@ class DictionaryWriter { size_t m_data_size; }; -template -DictionaryWriter::~DictionaryWriter () { - // Do nothing -} - template void DictionaryWriter::open (const std::string& dictionary_path, const std::string& segment_index_path, DictionaryIdType max_id) { if (m_is_open) { @@ -136,20 +131,19 @@ void DictionaryWriter::close () { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } - write_dictionary_info_to_disk(); + write_header_and_flush_to_disk(); m_segment_index_compressor.close(); m_segment_index_file_writer.close(); m_dictionary_compressor.close(); m_dictionary_file_writer.close(); - // clear maps m_value_to_id.clear(); m_is_open = false; } template -void DictionaryWriter::write_dictionary_info_to_disk () { +void DictionaryWriter::write_header_and_flush_to_disk () { if (false == m_is_open) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); } diff --git a/components/core/src/EncodedVariableInterpreter.hpp b/components/core/src/EncodedVariableInterpreter.hpp index b2725394b..4c0cb6db1 100644 --- a/components/core/src/EncodedVariableInterpreter.hpp +++ b/components/core/src/EncodedVariableInterpreter.hpp @@ -75,9 +75,10 @@ class EncodedVariableInterpreter { * @param logtype_dict_entry * @param var_dict * @param encoded_vars + * @param var_ids */ static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, - std::vector& encoded_vars, std::vector& added_vars); + std::vector& encoded_vars, std::vector& var_ids); /** * Decodes all variables and decompresses them into a message * @param logtype_dict_entry diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index d9de8f8e8..26cb41762 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -93,10 +93,8 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrget_data_size(); - // write the new entry to the compressor and free up the memory entry_ptr->write_to_file(m_dictionary_compressor); delete entry_ptr; } diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index e8ef5a4ed..e238fee95 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -74,7 +74,6 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic new_entry = true; // TODO: This doesn't account for the segment index that's constantly updated - // Nor does it account that compression doesn't store the whole entry in the memory m_data_size += entry->get_data_size(); // write the new entry to the compressor and free up the memory diff --git a/components/core/src/streaming_archive/writer/Archive.cpp b/components/core/src/streaming_archive/writer/Archive.cpp index 25f96062f..abb06f33b 100644 --- a/components/core/src/streaming_archive/writer/Archive.cpp +++ b/components/core/src/streaming_archive/writer/Archive.cpp @@ -265,15 +265,14 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; - // ids of variable type in the message - vector added_var_ids; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, added_var_ids); + vector var_ids; + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, var_ids); logtype_dictionary_id_t logtype_id; if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) { m_logtype_dict_entry_wrapper = make_unique(); } - file.write_encoded_msg(timestamp, logtype_id, encoded_vars, added_var_ids, num_uncompressed_bytes); + file.write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); } void Archive::write_dir_snapshot () { @@ -286,8 +285,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_dictionary_info_to_disk(); - m_var_dict.write_dictionary_info_to_disk(); + m_logtype_dict.write_header_and_flush_to_disk(); + m_var_dict.write_header_and_flush_to_disk(); } void Archive::append_file_to_segment (File*& file, Segment& segment, unordered_set& logtype_ids_in_segment, @@ -371,8 +370,8 @@ namespace streaming_archive { namespace writer { #endif // Flush dictionaries - m_logtype_dict.write_dictionary_info_to_disk(); - m_var_dict.write_dictionary_info_to_disk(); + m_logtype_dict.write_header_and_flush_to_disk(); + m_var_dict.write_header_and_flush_to_disk(); for (auto file : files) { file->mark_as_in_committed_segment(); diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index ea326ecd8..a7a188445 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -46,7 +46,6 @@ namespace streaming_archive { namespace writer { m_variables.clear(); // release the memory - m_variable_ids->clear(); delete m_variable_ids; m_variable_ids = nullptr; } @@ -58,8 +57,7 @@ namespace streaming_archive { namespace writer { m_logtypes.push_back(logtype_id); m_variables.push_back_all(encoded_vars); - // insert ids seen from log message to the file - // duplicate ids will be handled by unordered_set internally + // Insert message's variable IDs into the file's variable ID set for(const auto& id : added_var_ids){ m_variable_ids->emplace(id); } @@ -134,7 +132,7 @@ namespace streaming_archive { namespace writer { segment_logtype_ids.emplace(logtype_id); } // Add vartype to set - for(const variable_dictionary_id_t& id : *m_variable_ids){ + for (const variable_dictionary_id_t& id : *m_variable_ids) { segment_var_ids.emplace(id); } } diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index 5b9a9b4e5..39e923f68 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -54,11 +54,11 @@ namespace streaming_archive { namespace writer { m_segment_variables_pos(0), m_is_split(split_ix > 0), m_split_ix(split_ix), + m_variable_ids(nullptr), m_segmentation_state(SegmentationState_NotInSegment), m_is_metadata_clean(false), m_is_written_out(false), - m_is_open(false), - m_variable_ids(nullptr) + m_is_open(false) {} // Destructor @@ -82,10 +82,11 @@ namespace streaming_archive { namespace writer { * @param timestamp * @param logtype_id * @param encoded_vars + * @param var_ids * @param num_uncompressed_bytes */ void write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - const std::vector& added_vars_ids, size_t num_uncompressed_bytes); + const std::vector& var_ids, size_t num_uncompressed_bytes); /** * Changes timestamp pattern in use at current message in file @@ -184,9 +185,9 @@ namespace streaming_archive { namespace writer { * @param segment_var_ids */ void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, - size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, - std::unordered_set& segment_logtype_ids, - std::unordered_set& segment_var_ids); + size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, + std::unordered_set& segment_logtype_ids, + std::unordered_set& segment_var_ids); /** * Sets segment-related metadata to the given values @@ -230,7 +231,7 @@ namespace streaming_archive { namespace writer { PageAllocatedVector m_timestamps; PageAllocatedVector m_logtypes; PageAllocatedVector m_variables; - std::unordered_set * m_variable_ids; + std::unordered_set* m_variable_ids; // State variables SegmentationState m_segmentation_state; diff --git a/components/core/tests/test-EncodedVariableInterpreter.cpp b/components/core/tests/test-EncodedVariableInterpreter.cpp index 74ebc4dda..5985ddaab 100644 --- a/components/core/tests/test-EncodedVariableInterpreter.cpp +++ b/components/core/tests/test-EncodedVariableInterpreter.cpp @@ -242,7 +242,8 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { // Test encoding vector encoded_vars; vector added_var_ids; - vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00", "bin/python2.7.3"}; + vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), + "-25.5196868642755", "-00.00", "bin/python2.7.3"}; msg = "here is a string with a small int " + var_strs[0] + " and a very large int " + var_strs[1] + " and a double " + var_strs[2] + " and a weird double " + var_strs[3] + " and a str with numbers " + var_strs[4]; LogTypeDictionaryEntry logtype_dict_entry; @@ -250,15 +251,15 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { var_dict_writer.close(); // Test added_var_ids is correctly populated - size_t encoded_var_id_count = 0; - for(const auto& var : encoded_vars) { + size_t encoded_var_id_ix = 0; + for (const auto& var : encoded_vars) { if(EncodedVariableInterpreter::is_var_dict_id(var)){ - REQUIRE(added_var_ids.size() > encoded_var_id_count); - REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == added_var_ids[encoded_var_id_count]); - encoded_var_id_count++; + REQUIRE(added_var_ids.size() > encoded_var_id_ix); + REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == added_var_ids[encoded_var_id_ix]); + encoded_var_id_ix++; } } - REQUIRE(added_var_ids.size() == encoded_var_id_count); + REQUIRE(added_var_ids.size() == encoded_var_id_ix); // Open reader VariableDictionaryReader var_dict_reader; From 611c992b43bddbdddf202e5731ff77795ebd08b6 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 21:51:28 -0500 Subject: [PATCH 05/13] fix a variable name mismatch that was not caught by compiler and remove redundant comment --- components/core/src/EncodedVariableInterpreter.cpp | 4 ++-- components/core/src/VariableDictionaryWriter.cpp | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index fbf41be5d..e7f372169 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -204,7 +204,7 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, vector& encoded_vars, - vector& added_vars) + vector& var_ids) { // Extract all variables and add to dictionary while building logtype size_t tok_begin_pos = 0; @@ -226,7 +226,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes variable_dictionary_id_t id; var_dict.add_occurrence(var_str, id); encoded_var = encode_var_dict_id(id); - added_vars.push_back(id); + var_ids.push_back(id); logtype_dict_entry.add_non_double_var(); } diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index e238fee95..ee8db01aa 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -76,7 +76,6 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic // TODO: This doesn't account for the segment index that's constantly updated m_data_size += entry->get_data_size(); - // write the new entry to the compressor and free up the memory entry->write_to_file(m_dictionary_compressor); delete entry; } From 5dd2954401e3411fd4f39ab59cb88e17d667a974 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 22:13:18 -0500 Subject: [PATCH 06/13] remove unused arguments from function --- .../core/src/streaming_archive/writer/File.cpp | 12 +++++------- .../core/src/streaming_archive/writer/File.hpp | 6 +----- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index a7a188445..0b51270e0 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -26,8 +26,7 @@ namespace streaming_archive { namespace writer { // Add file's logtype and variable IDs to respective segment sets auto logtype_ids = m_logtypes.data(); auto variables = m_variables.data(); - append_logtype_and_var_ids_to_segment_sets(logtype_dict, logtype_ids, m_logtypes.size(), variables, m_variables.size(), segment_logtype_ids, - segment_var_ids); + append_logtype_and_var_ids_to_segment_sets(logtype_ids, m_logtypes.size(), segment_logtype_ids, segment_var_ids); // Append files to segment uint64_t segment_timestamps_uncompressed_pos; @@ -58,7 +57,7 @@ namespace streaming_archive { namespace writer { m_variables.push_back_all(encoded_vars); // Insert message's variable IDs into the file's variable ID set - for(const auto& id : added_var_ids){ + for (const auto& id : added_var_ids) { m_variable_ids->emplace(id); } @@ -121,17 +120,16 @@ namespace streaming_archive { namespace writer { return encoded_timestamp_patterns; } - void File::append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, - size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, + void File::append_logtype_and_var_ids_to_segment_sets (const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, unordered_set& segment_logtype_ids, unordered_set& segment_var_ids) { - // Add logtype to set + // Add logtype IDs for (size_t i = 0; i < num_logtypes; ++i) { auto logtype_id = logtype_ids[i]; segment_logtype_ids.emplace(logtype_id); } - // Add vartype to set + // Add variable IDs for (const variable_dictionary_id_t& id : *m_variable_ids) { segment_var_ids.emplace(id); } diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index 39e923f68..c695097e5 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -176,16 +176,12 @@ namespace streaming_archive { namespace writer { // Methods /** * Takes logtype and variable IDs from a file's logtype and variable columns and appends them to the given sets - * @param logtype_dict * @param logtype_ids * @param num_logtypes - * @param vars - * @param num_vars * @param segment_logtype_ids * @param segment_var_ids */ - void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids, - size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars, + void append_logtype_and_var_ids_to_segment_sets (const logtype_dictionary_id_t* logtype_ids, size_t num_logtypes, std::unordered_set& segment_logtype_ids, std::unordered_set& segment_var_ids); From 5f12a10ffe0542b0b375955c6670b9a6595e7633 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Fri, 26 Nov 2021 23:43:15 -0500 Subject: [PATCH 07/13] replace dynamically created entry with static ones. updated the open_and_preload flow to directly write str_value of entry into the hash map --- components/core/src/DictionaryWriter.hpp | 26 ++++++++++++++++++ .../core/src/EncodedVariableInterpreter.cpp | 1 - .../core/src/LogTypeDictionaryWriter.cpp | 27 +++++++++---------- .../core/src/LogTypeDictionaryWriter.hpp | 4 +-- .../core/src/VariableDictionaryEntry.cpp | 4 +++ .../core/src/VariableDictionaryEntry.hpp | 2 ++ .../core/src/VariableDictionaryWriter.cpp | 13 +++++---- .../src/streaming_archive/writer/Archive.cpp | 7 +++-- 8 files changed, 55 insertions(+), 29 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index 0e93ef93c..92ccfc65a 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -57,6 +57,13 @@ class DictionaryWriter { */ void write_header_and_flush_to_disk (); + /** + * Adds the str_value of entry into the str_to_id map. + * Raises an error if the entry already exists in the map + * @param entry + */ + void insert_non_duplicate_value_into_hash_map (const EntryType& entry); + /** * Adds the given segment and IDs to the segment index * @param segment_id @@ -160,6 +167,25 @@ void DictionaryWriter::write_header_and_flush_to_di m_dictionary_file_writer.flush(); } +template +void DictionaryWriter::insert_non_duplicate_value_into_hash_map (const EntryType& entry) { + + const auto& str_value = entry.get_value(); + if (m_value_to_id.find(str_value) != m_value_to_id.end()) { + SPDLOG_ERROR("entry value already exists"); + throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); + } + if (m_next_id > m_max_id) { + SPDLOG_ERROR("DictionaryWriter ran out of IDs."); + throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); + } + + // Assign ID + DictionaryIdType id = m_next_id; + m_next_id++; + m_value_to_id[str_value] = id; +} + template void DictionaryWriter::index_segment (segment_id_t segment_id, const std::unordered_set& ids) { if (false == m_is_open) { diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index e7f372169..04d734b56 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -211,7 +211,6 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes size_t next_delim_pos = 0; size_t last_var_end_pos = 0; string var_str; - logtype_dict_entry.clear(); // To avoid reallocating the logtype as we append to it, reserve enough space to hold the entire message logtype_dict_entry.reserve_constant_length(message.length()); while (logtype_dict_entry.parse_next_var(message, tok_begin_pos, next_delim_pos, last_var_end_pos, var_str)) { diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index 26cb41762..f39a81aeb 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -22,12 +22,12 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - // Read new dictionary entries - logtype_dictionary_id_t id; + // Add new dictionary entries values into hash map + LogTypeDictionaryEntry logtype_entry; for (size_t i = 0; i < num_dictionary_entries; ++i) { - auto logtype_dict_entry_wrapper = std::make_unique(); - logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor); - add_occurrence(logtype_dict_entry_wrapper, id); + logtype_entry.read_from_file(dictionary_decompressor); + insert_non_duplicate_value_into_hash_map(logtype_entry); + logtype_entry.clear(); } segment_index_decompressor.close(); @@ -46,15 +46,14 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa m_is_open = true; } -bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr& entry_wrapper, logtype_dictionary_id_t& logtype_id) { - if (nullptr == entry_wrapper) { +bool LogTypeDictionaryWriter::add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { + if (logtype_entry.get_value().empty()) { throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); } - auto& entry = *entry_wrapper; bool is_new_entry = false; - const string& value = entry.get_value(); + const string& value = logtype_entry.get_value(); const auto ix = m_value_to_id.find(value); if (m_value_to_id.end() != ix) { // Entry exists so get its ID @@ -79,24 +78,22 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptrget_data_size(); + m_data_size += logtype_entry.get_data_size(); - entry_ptr->write_to_file(m_dictionary_compressor); - delete entry_ptr; + logtype_entry.write_to_file(m_dictionary_compressor); } return is_new_entry; } diff --git a/components/core/src/LogTypeDictionaryWriter.hpp b/components/core/src/LogTypeDictionaryWriter.hpp index 413fd5de5..6ee5f7327 100644 --- a/components/core/src/LogTypeDictionaryWriter.hpp +++ b/components/core/src/LogTypeDictionaryWriter.hpp @@ -39,10 +39,10 @@ class LogTypeDictionaryWriter : public DictionaryWriter& entry_wrapper, logtype_dictionary_id_t& logtype_id); + bool add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id); }; #endif // LOGTYPEDICTIONARYWRITER_HPP diff --git a/components/core/src/VariableDictionaryEntry.cpp b/components/core/src/VariableDictionaryEntry.cpp index 1758931b5..f4e96dd2b 100644 --- a/components/core/src/VariableDictionaryEntry.cpp +++ b/components/core/src/VariableDictionaryEntry.cpp @@ -4,6 +4,10 @@ size_t VariableDictionaryEntry::get_data_size () const { return sizeof(m_id) + m_value.length() + m_ids_of_segments_containing_entry.size() * sizeof(segment_id_t); } +void VariableDictionaryEntry::clear () { + m_value.clear(); +} + void VariableDictionaryEntry::write_to_file (streaming_compression::zstd::Compressor& compressor) const { compressor.write_numeric_value(m_id); compressor.write_numeric_value(m_value.length()); diff --git a/components/core/src/VariableDictionaryEntry.hpp b/components/core/src/VariableDictionaryEntry.hpp index f480d99d7..faa7eac10 100644 --- a/components/core/src/VariableDictionaryEntry.hpp +++ b/components/core/src/VariableDictionaryEntry.hpp @@ -44,6 +44,8 @@ class VariableDictionaryEntry : public DictionaryEntry */ size_t get_data_size () const; + void clear (); + /** * Writes an entry to file * @param compressor diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index ee8db01aa..c573b69ec 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -25,12 +25,12 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - // Read new dictionary entries - variable_dictionary_id_t id; + // Add new dictionary entries values into hash map VariableDictionaryEntry var_dict_entry; for (size_t i = 0; i < num_dictionary_entries; ++i) { var_dict_entry.read_from_file(dictionary_decompressor); - add_occurrence(var_dict_entry.get_value(), id); + insert_non_duplicate_value_into_hash_map(var_dict_entry); + var_dict_entry.clear(); } segment_index_decompressor.close(); @@ -68,16 +68,15 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic ++m_next_id; // Insert the ID obtained from the database into the dictionary - auto* entry = new VariableDictionaryEntry(value, id); + auto entry = VariableDictionaryEntry(value, id); m_value_to_id[value] = id; new_entry = true; // TODO: This doesn't account for the segment index that's constantly updated - m_data_size += entry->get_data_size(); + m_data_size += entry.get_data_size(); - entry->write_to_file(m_dictionary_compressor); - delete entry; + entry.write_to_file(m_dictionary_compressor); } return new_entry; } diff --git a/components/core/src/streaming_archive/writer/Archive.cpp b/components/core/src/streaming_archive/writer/Archive.cpp index abb06f33b..033302749 100644 --- a/components/core/src/streaming_archive/writer/Archive.cpp +++ b/components/core/src/streaming_archive/writer/Archive.cpp @@ -266,11 +266,10 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; vector var_ids; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, var_ids); + LogTypeDictionaryEntry logtype_entry; + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, logtype_entry, m_var_dict, encoded_vars, var_ids); logtype_dictionary_id_t logtype_id; - if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) { - m_logtype_dict_entry_wrapper = make_unique(); - } + m_logtype_dict.add_occurrence(logtype_entry, logtype_id); file.write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); } From 2d4cbf14ac96139058cb089ed7c475c2f301305e Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sat, 27 Nov 2021 10:15:40 -0500 Subject: [PATCH 08/13] better function refactor --- components/core/src/DictionaryWriter.hpp | 46 +++++++++++-------- .../core/src/LogTypeDictionaryWriter.cpp | 10 +--- .../core/src/VariableDictionaryWriter.cpp | 10 +--- 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index 92ccfc65a..ccedac27e 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -14,6 +14,7 @@ #include "FileWriter.hpp" #include "streaming_compression/zstd/Compressor.hpp" #include "TraceableException.hpp" +#include "dictionary_utils.hpp" /** * Template class for performing operations on dictionaries and writing them to disk @@ -58,11 +59,13 @@ class DictionaryWriter { void write_header_and_flush_to_disk (); /** - * Adds the str_value of entry into the str_to_id map. - * Raises an error if the entry already exists in the map - * @param entry + * Preload dictionary and populate the hash map with str values + * From the input dictionary + * @param dictionary_decompressor + * @param dictionary_file_reader */ - void insert_non_duplicate_value_into_hash_map (const EntryType& entry); + void preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor, + FileReader& dictionary_file_reader); /** * Adds the given segment and IDs to the segment index @@ -168,22 +171,29 @@ void DictionaryWriter::write_header_and_flush_to_di } template -void DictionaryWriter::insert_non_duplicate_value_into_hash_map (const EntryType& entry) { +void DictionaryWriter::preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor, + FileReader& dictionary_file_reader) { + + auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); + EntryType entry; + for (size_t i = 0; i < num_dictionary_entries; ++i) { + entry.read_from_file(dictionary_decompressor); + const auto& str_value = entry.get_value(); + if (m_value_to_id.find(str_value) != m_value_to_id.end()) { + SPDLOG_ERROR("entry value already exists"); + throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); + } + if (m_next_id > m_max_id) { + SPDLOG_ERROR("DictionaryWriter ran out of IDs."); + throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); + } - const auto& str_value = entry.get_value(); - if (m_value_to_id.find(str_value) != m_value_to_id.end()) { - SPDLOG_ERROR("entry value already exists"); - throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); - } - if (m_next_id > m_max_id) { - SPDLOG_ERROR("DictionaryWriter ran out of IDs."); - throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); - } + DictionaryIdType id = m_next_id; + m_next_id++; + m_value_to_id[str_value] = id; - // Assign ID - DictionaryIdType id = m_next_id; - m_next_id++; - m_value_to_id[str_value] = id; + entry.clear(); + } } template diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index f39a81aeb..f4de6380b 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -20,15 +20,7 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, segment_index_file_reader, segment_index_decompressor); - auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - - // Add new dictionary entries values into hash map - LogTypeDictionaryEntry logtype_entry; - for (size_t i = 0; i < num_dictionary_entries; ++i) { - logtype_entry.read_from_file(dictionary_decompressor); - insert_non_duplicate_value_into_hash_map(logtype_entry); - logtype_entry.clear(); - } + preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader); segment_index_decompressor.close(); segment_index_file_reader.close(); diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index c573b69ec..d122fc94d 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -23,15 +23,7 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, segment_index_file_reader, segment_index_decompressor); - auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - - // Add new dictionary entries values into hash map - VariableDictionaryEntry var_dict_entry; - for (size_t i = 0; i < num_dictionary_entries; ++i) { - var_dict_entry.read_from_file(dictionary_decompressor); - insert_non_duplicate_value_into_hash_map(var_dict_entry); - var_dict_entry.clear(); - } + preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader); segment_index_decompressor.close(); segment_index_file_reader.close(); From c21ac31d41ba751fd723bc8e40481c114ad20b34 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sat, 27 Nov 2021 11:44:16 -0500 Subject: [PATCH 09/13] use unique_ptr for dynamically allocated set. replace for loop with direct insertion --- .../core/src/streaming_archive/writer/File.cpp | 18 +++++------------- .../core/src/streaming_archive/writer/File.hpp | 2 +- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/components/core/src/streaming_archive/writer/File.cpp b/components/core/src/streaming_archive/writer/File.cpp index 0b51270e0..dbae37436 100644 --- a/components/core/src/streaming_archive/writer/File.cpp +++ b/components/core/src/streaming_archive/writer/File.cpp @@ -12,7 +12,7 @@ namespace streaming_archive { namespace writer { if (m_is_written_out) { throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); } - m_variable_ids = new std::unordered_set; + m_variable_ids = std::make_unique>(); m_is_open = true; } @@ -25,7 +25,6 @@ namespace streaming_archive { namespace writer { // Add file's logtype and variable IDs to respective segment sets auto logtype_ids = m_logtypes.data(); - auto variables = m_variables.data(); append_logtype_and_var_ids_to_segment_sets(logtype_ids, m_logtypes.size(), segment_logtype_ids, segment_var_ids); // Append files to segment @@ -38,15 +37,12 @@ namespace streaming_archive { namespace writer { set_segment_metadata(segment.get_id(), segment_timestamps_uncompressed_pos, segment_logtypes_uncompressed_pos, segment_variables_uncompressed_pos); m_segmentation_state = SegmentationState_MovingToSegment; - // Mark file as written out and clear in-memory columns + // Mark file as written out and clear in-memory columns and clear the in-memory data (except metadata) m_is_written_out = true; m_timestamps.clear(); m_logtypes.clear(); m_variables.clear(); - - // release the memory - delete m_variable_ids; - m_variable_ids = nullptr; + m_variable_ids.reset(nullptr); } void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, @@ -57,9 +53,7 @@ namespace streaming_archive { namespace writer { m_variables.push_back_all(encoded_vars); // Insert message's variable IDs into the file's variable ID set - for (const auto& id : added_var_ids) { - m_variable_ids->emplace(id); - } + m_variable_ids->insert(added_var_ids.cbegin(), added_var_ids.cend()); // Update metadata ++m_num_messages; @@ -130,9 +124,7 @@ namespace streaming_archive { namespace writer { segment_logtype_ids.emplace(logtype_id); } // Add variable IDs - for (const variable_dictionary_id_t& id : *m_variable_ids) { - segment_var_ids.emplace(id); - } + segment_var_ids.insert(m_variable_ids->cbegin(), m_variable_ids->cend()); } void File::set_segment_metadata (segment_id_t segment_id, uint64_t segment_timestamps_uncompressed_pos, uint64_t segment_logtypes_uncompressed_pos, diff --git a/components/core/src/streaming_archive/writer/File.hpp b/components/core/src/streaming_archive/writer/File.hpp index c695097e5..a21b6b824 100644 --- a/components/core/src/streaming_archive/writer/File.hpp +++ b/components/core/src/streaming_archive/writer/File.hpp @@ -227,7 +227,7 @@ namespace streaming_archive { namespace writer { PageAllocatedVector m_timestamps; PageAllocatedVector m_logtypes; PageAllocatedVector m_variables; - std::unordered_set* m_variable_ids; + std::unique_ptr> m_variable_ids; // State variables SegmentationState m_segmentation_state; From 366a4639e0a51893f19a62aa9e2396e1ae706793 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sun, 28 Nov 2021 11:30:15 -0500 Subject: [PATCH 10/13] address some issues in code review --- components/core/src/DictionaryWriter.hpp | 66 ++++++++++++++----- .../core/src/LogTypeDictionaryWriter.cpp | 39 +---------- .../core/src/LogTypeDictionaryWriter.hpp | 8 --- .../core/src/VariableDictionaryEntry.cpp | 4 -- .../core/src/VariableDictionaryEntry.hpp | 2 +- .../core/src/VariableDictionaryWriter.cpp | 37 +---------- .../core/src/VariableDictionaryWriter.hpp | 9 --- .../src/streaming_archive/writer/File.cpp | 9 +-- .../tests/test-EncodedVariableInterpreter.cpp | 12 ++-- 9 files changed, 64 insertions(+), 122 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index ccedac27e..720fec583 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -2,6 +2,7 @@ #define DICTIONARYWRITER_HPP // C++ standard libraries +#include #include #include #include @@ -11,10 +12,10 @@ // Project headers #include "Defs.h" +#include "dictionary_utils.hpp" #include "FileWriter.hpp" #include "streaming_compression/zstd/Compressor.hpp" #include "TraceableException.hpp" -#include "dictionary_utils.hpp" /** * Template class for performing operations on dictionaries and writing them to disk @@ -59,13 +60,12 @@ class DictionaryWriter { void write_header_and_flush_to_disk (); /** - * Preload dictionary and populate the hash map with str values - * From the input dictionary - * @param dictionary_decompressor - * @param dictionary_file_reader + * Opens dictionary, loads entries, and then sets it up for writing + * @param dictionary_path + * @param segment_index_path + * @param max_id */ - void preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor, - FileReader& dictionary_file_reader); + void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, const variable_dictionary_id_t max_id); /** * Adds the given segment and IDs to the segment index @@ -171,29 +171,59 @@ void DictionaryWriter::write_header_and_flush_to_di } template -void DictionaryWriter::preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor, - FileReader& dictionary_file_reader) { +void DictionaryWriter::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, + const variable_dictionary_id_t max_id) +{ + if (m_is_open) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + + m_max_id = max_id; + + FileReader dictionary_file_reader; + streaming_compression::zstd::Decompressor dictionary_decompressor; + FileReader segment_index_file_reader; + streaming_compression::zstd::Decompressor segment_index_decompressor; + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, + segment_index_file_reader, segment_index_decompressor); auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); + m_next_id = num_dictionary_entries; + if (m_next_id > m_max_id) { + SPDLOG_ERROR("DictionaryWriter ran out of IDs."); + throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); + } + // Loads entries from the given decompressor and file EntryType entry; for (size_t i = 0; i < num_dictionary_entries; ++i) { entry.read_from_file(dictionary_decompressor); const auto& str_value = entry.get_value(); - if (m_value_to_id.find(str_value) != m_value_to_id.end()) { - SPDLOG_ERROR("entry value already exists"); + if (m_value_to_id.count(str_value)) { + SPDLOG_ERROR("Entry's value already exists in dictionary"); throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); } - if (m_next_id > m_max_id) { - SPDLOG_ERROR("DictionaryWriter ran out of IDs."); - throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); - } - DictionaryIdType id = m_next_id; - m_next_id++; + auto id = entry.get_id(); m_value_to_id[str_value] = id; - + m_data_size += entry.get_data_size(); entry.clear(); } + + segment_index_decompressor.close(); + segment_index_file_reader.close(); + dictionary_decompressor.close(); + dictionary_file_reader.close(); + + m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); + // Open compressor + m_dictionary_compressor.open(m_dictionary_file_writer); + + m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); + // Open compressor + m_segment_index_compressor.open(m_segment_index_file_writer); + + m_is_open = true; } template diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index f4de6380b..6ace9b7cd 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -5,47 +5,14 @@ using std::string; -void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, logtype_dictionary_id_t max_id) { - if (m_is_open) { - throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); - } - - m_max_id = max_id; - - FileReader dictionary_file_reader; - streaming_compression::zstd::Decompressor dictionary_decompressor; - FileReader segment_index_file_reader; - streaming_compression::zstd::Decompressor segment_index_decompressor; - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, - segment_index_file_reader, segment_index_decompressor); - - preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader); - - segment_index_decompressor.close(); - segment_index_file_reader.close(); - dictionary_decompressor.close(); - dictionary_file_reader.close(); - - m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_dictionary_compressor.open(m_dictionary_file_writer); - - m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_segment_index_compressor.open(m_segment_index_file_writer); - - m_is_open = true; -} - bool LogTypeDictionaryWriter::add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { - if (logtype_entry.get_value().empty()) { - throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); - } bool is_new_entry = false; const string& value = logtype_entry.get_value(); + if (value.empty()) { + throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); + } const auto ix = m_value_to_id.find(value); if (m_value_to_id.end() != ix) { // Entry exists so get its ID diff --git a/components/core/src/LogTypeDictionaryWriter.hpp b/components/core/src/LogTypeDictionaryWriter.hpp index 6ee5f7327..430f0cdf2 100644 --- a/components/core/src/LogTypeDictionaryWriter.hpp +++ b/components/core/src/LogTypeDictionaryWriter.hpp @@ -28,14 +28,6 @@ class LogTypeDictionaryWriter : public DictionaryWriter(m_value.length()); diff --git a/components/core/src/VariableDictionaryEntry.hpp b/components/core/src/VariableDictionaryEntry.hpp index faa7eac10..f4d45276b 100644 --- a/components/core/src/VariableDictionaryEntry.hpp +++ b/components/core/src/VariableDictionaryEntry.hpp @@ -44,7 +44,7 @@ class VariableDictionaryEntry : public DictionaryEntry */ size_t get_data_size () const; - void clear (); + void clear () { m_value.clear(); } /** * Writes an entry to file diff --git a/components/core/src/VariableDictionaryWriter.cpp b/components/core/src/VariableDictionaryWriter.cpp index d122fc94d..e25bc1a04 100644 --- a/components/core/src/VariableDictionaryWriter.cpp +++ b/components/core/src/VariableDictionaryWriter.cpp @@ -6,42 +6,7 @@ // Project headers #include "dictionary_utils.hpp" -using std::string; - -void VariableDictionaryWriter::open_and_preload (const string& dictionary_path, const string& segment_index_path, variable_dictionary_id_t max_id) { - if (m_is_open) { - throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); - } - - m_max_id = max_id; - - FileReader dictionary_file_reader; - streaming_compression::zstd::Decompressor dictionary_decompressor; - FileReader segment_index_file_reader; - streaming_compression::zstd::Decompressor segment_index_decompressor; - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor, - segment_index_file_reader, segment_index_decompressor); - - preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader); - - segment_index_decompressor.close(); - segment_index_file_reader.close(); - dictionary_decompressor.close(); - dictionary_file_reader.close(); - - m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_dictionary_compressor.open(m_dictionary_file_writer); - - m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING); - // Open compressor - m_segment_index_compressor.open(m_segment_index_file_writer); - - m_is_open = true; -} - -bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) { +bool VariableDictionaryWriter::add_occurrence (const std::string& value, variable_dictionary_id_t& id) { bool new_entry = false; const auto ix = m_value_to_id.find(value); diff --git a/components/core/src/VariableDictionaryWriter.hpp b/components/core/src/VariableDictionaryWriter.hpp index d5095703d..865b6189c 100644 --- a/components/core/src/VariableDictionaryWriter.hpp +++ b/components/core/src/VariableDictionaryWriter.hpp @@ -23,15 +23,6 @@ class VariableDictionaryWriter : public DictionaryWriter>(); + m_variable_ids = std::make_unique>(); m_is_open = true; } @@ -45,15 +46,15 @@ namespace streaming_archive { namespace writer { m_variable_ids.reset(nullptr); } - void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector& encoded_vars, - const std::vector& added_var_ids, size_t num_uncompressed_bytes) + void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const vector& encoded_vars, + const vector& var_ids, size_t num_uncompressed_bytes) { m_timestamps.push_back(timestamp); m_logtypes.push_back(logtype_id); m_variables.push_back_all(encoded_vars); // Insert message's variable IDs into the file's variable ID set - m_variable_ids->insert(added_var_ids.cbegin(), added_var_ids.cend()); + m_variable_ids->insert(var_ids.cbegin(), var_ids.cend()); // Update metadata ++m_num_messages; diff --git a/components/core/tests/test-EncodedVariableInterpreter.cpp b/components/core/tests/test-EncodedVariableInterpreter.cpp index 5985ddaab..35d9d5ae0 100644 --- a/components/core/tests/test-EncodedVariableInterpreter.cpp +++ b/components/core/tests/test-EncodedVariableInterpreter.cpp @@ -241,25 +241,25 @@ TEST_CASE("EncodedVariableInterpreter", "[EncodedVariableInterpreter]") { // Test encoding vector encoded_vars; - vector added_var_ids; + vector var_ids; vector var_strs = {"4938", to_string(EncodedVariableInterpreter::get_var_dict_id_range_begin()), "-25.5196868642755", "-00.00", "bin/python2.7.3"}; msg = "here is a string with a small int " + var_strs[0] + " and a very large int " + var_strs[1] + " and a double " + var_strs[2] + " and a weird double " + var_strs[3] + " and a str with numbers " + var_strs[4]; LogTypeDictionaryEntry logtype_dict_entry; - EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars, added_var_ids); + EncodedVariableInterpreter::encode_and_add_to_dictionary(msg, logtype_dict_entry, var_dict_writer, encoded_vars, var_ids); var_dict_writer.close(); - // Test added_var_ids is correctly populated + // Test var_ids is correctly populated size_t encoded_var_id_ix = 0; for (const auto& var : encoded_vars) { if(EncodedVariableInterpreter::is_var_dict_id(var)){ - REQUIRE(added_var_ids.size() > encoded_var_id_ix); - REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == added_var_ids[encoded_var_id_ix]); + REQUIRE(var_ids.size() > encoded_var_id_ix); + REQUIRE(EncodedVariableInterpreter::decode_var_dict_id(var) == var_ids[encoded_var_id_ix]); encoded_var_id_ix++; } } - REQUIRE(added_var_ids.size() == encoded_var_id_ix); + REQUIRE(var_ids.size() == encoded_var_id_ix); // Open reader VariableDictionaryReader var_dict_reader; From 9f97f797fbadd937d39bda11c61d290c5b4214a5 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sun, 28 Nov 2021 12:20:33 -0500 Subject: [PATCH 11/13] Move LogEntry off from stack to avoid unnecessary allocation and destruction --- components/core/src/EncodedVariableInterpreter.cpp | 1 + components/core/src/LogTypeDictionaryWriter.hpp | 3 +-- components/core/src/VariableDictionaryWriter.hpp | 4 ++-- .../core/src/streaming_archive/writer/Archive.cpp | 10 +++------- .../core/src/streaming_archive/writer/Archive.hpp | 4 ++-- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index 04d734b56..e7f372169 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -211,6 +211,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes size_t next_delim_pos = 0; size_t last_var_end_pos = 0; string var_str; + logtype_dict_entry.clear(); // To avoid reallocating the logtype as we append to it, reserve enough space to hold the entire message logtype_dict_entry.reserve_constant_length(message.length()); while (logtype_dict_entry.parse_next_var(message, tok_begin_pos, next_delim_pos, last_var_end_pos, var_str)) { diff --git a/components/core/src/LogTypeDictionaryWriter.hpp b/components/core/src/LogTypeDictionaryWriter.hpp index 430f0cdf2..b3ef75bdd 100644 --- a/components/core/src/LogTypeDictionaryWriter.hpp +++ b/components/core/src/LogTypeDictionaryWriter.hpp @@ -29,8 +29,7 @@ class LogTypeDictionaryWriter : public DictionaryWriter(); - // Open variable dictionary string var_dict_path = archive_path_string + '/' + cVarDictFilename; string var_dict_segment_index_path = archive_path_string + '/' + cVarSegmentIndexFilename; @@ -209,7 +206,7 @@ namespace streaming_archive { namespace writer { write_dir_snapshot(); m_logtype_dict.close(); - m_logtype_dict_entry_wrapper.reset(nullptr); + m_logtype_dict_entry.clear(); m_var_dict.close(); if (::close(m_segments_dir_fd) != 0) { @@ -266,10 +263,9 @@ namespace streaming_archive { namespace writer { void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) { vector encoded_vars; vector var_ids; - LogTypeDictionaryEntry logtype_entry; - EncodedVariableInterpreter::encode_and_add_to_dictionary(message, logtype_entry, m_var_dict, encoded_vars, var_ids); + EncodedVariableInterpreter::encode_and_add_to_dictionary(message, m_logtype_dict_entry, m_var_dict, encoded_vars, var_ids); logtype_dictionary_id_t logtype_id; - m_logtype_dict.add_occurrence(logtype_entry, logtype_id); + m_logtype_dict.add_occurrence(m_logtype_dict_entry, logtype_id); file.write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); } diff --git a/components/core/src/streaming_archive/writer/Archive.hpp b/components/core/src/streaming_archive/writer/Archive.hpp index 1ec5c6d68..757b3d262 100644 --- a/components/core/src/streaming_archive/writer/Archive.hpp +++ b/components/core/src/streaming_archive/writer/Archive.hpp @@ -243,8 +243,8 @@ namespace streaming_archive { namespace writer { int m_segments_dir_fd; LogTypeDictionaryWriter m_logtype_dict; - // Wrapper to hold logtype dictionary entry that's preallocated for performance - std::unique_ptr m_logtype_dict_entry_wrapper; + // Holds preallocated logtype dictionary entry for performance + LogTypeDictionaryEntry m_logtype_dict_entry; VariableDictionaryWriter m_var_dict; boost::uuids::random_generator m_uuid_generator; From 07808cdee4120ab89240dcfe05619614da74c66e Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sun, 28 Nov 2021 15:42:41 -0500 Subject: [PATCH 12/13] renamed add_occurrence function and slightly modified how next_id is assigned in preload flow --- components/core/src/DictionaryWriter.hpp | 7 ++++--- components/core/src/EncodedVariableInterpreter.cpp | 2 +- components/core/src/LogTypeDictionaryWriter.cpp | 2 +- components/core/src/LogTypeDictionaryWriter.hpp | 2 +- components/core/src/VariableDictionaryWriter.cpp | 2 +- components/core/src/VariableDictionaryWriter.hpp | 2 +- components/core/src/streaming_archive/writer/Archive.cpp | 2 +- 7 files changed, 10 insertions(+), 9 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index 720fec583..322af42e0 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -189,12 +189,11 @@ void DictionaryWriter::open_and_preload (const std: segment_index_file_reader, segment_index_decompressor); auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader); - m_next_id = num_dictionary_entries; - if (m_next_id > m_max_id) { + if (num_dictionary_entries > m_max_id) { SPDLOG_ERROR("DictionaryWriter ran out of IDs."); throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__); } - // Loads entries from the given decompressor and file + // Loads entries from the given dictionary file EntryType entry; for (size_t i = 0; i < num_dictionary_entries; ++i) { entry.read_from_file(dictionary_decompressor); @@ -210,6 +209,8 @@ void DictionaryWriter::open_and_preload (const std: entry.clear(); } + m_next_id = num_dictionary_entries; + segment_index_decompressor.close(); segment_index_file_reader.close(); dictionary_decompressor.close(); diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index e7f372169..ff55250b6 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -224,7 +224,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes } else { // Variable string looks like a dictionary variable, so encode it as so variable_dictionary_id_t id; - var_dict.add_occurrence(var_str, id); + var_dict.add_entry(var_str, id); encoded_var = encode_var_dict_id(id); var_ids.push_back(id); diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index 6ace9b7cd..f2355d908 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -5,7 +5,7 @@ using std::string; -bool LogTypeDictionaryWriter::add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { +bool LogTypeDictionaryWriter::add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { bool is_new_entry = false; diff --git a/components/core/src/LogTypeDictionaryWriter.hpp b/components/core/src/LogTypeDictionaryWriter.hpp index b3ef75bdd..55c5af44a 100644 --- a/components/core/src/LogTypeDictionaryWriter.hpp +++ b/components/core/src/LogTypeDictionaryWriter.hpp @@ -33,7 +33,7 @@ class LogTypeDictionaryWriter : public DictionaryWriter var_ids; EncodedVariableInterpreter::encode_and_add_to_dictionary(message, m_logtype_dict_entry, m_var_dict, encoded_vars, var_ids); logtype_dictionary_id_t logtype_id; - m_logtype_dict.add_occurrence(m_logtype_dict_entry, logtype_id); + m_logtype_dict.add_entry(m_logtype_dict_entry, logtype_id); file.write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); } From 17d256b1c38ef9555a840866170cc319f3720428 Mon Sep 17 00:00:00 2001 From: Haiqi <14502009+haiqi96@users.noreply.github.com> Date: Sun, 28 Nov 2021 16:31:55 -0500 Subject: [PATCH 13/13] Fixed a few small places for cleaner code --- components/core/src/DictionaryWriter.hpp | 7 +++---- components/core/src/LogTypeDictionaryWriter.cpp | 1 - components/core/src/VariableDictionaryWriter.hpp | 4 ++-- components/core/src/streaming_archive/writer/Archive.hpp | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/components/core/src/DictionaryWriter.hpp b/components/core/src/DictionaryWriter.hpp index 322af42e0..5b1af4d0a 100644 --- a/components/core/src/DictionaryWriter.hpp +++ b/components/core/src/DictionaryWriter.hpp @@ -65,7 +65,7 @@ class DictionaryWriter { * @param segment_index_path * @param max_id */ - void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, const variable_dictionary_id_t max_id); + void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, variable_dictionary_id_t max_id); /** * Adds the given segment and IDs to the segment index @@ -196,6 +196,7 @@ void DictionaryWriter::open_and_preload (const std: // Loads entries from the given dictionary file EntryType entry; for (size_t i = 0; i < num_dictionary_entries; ++i) { + entry.clear(); entry.read_from_file(dictionary_decompressor); const auto& str_value = entry.get_value(); if (m_value_to_id.count(str_value)) { @@ -203,10 +204,8 @@ void DictionaryWriter::open_and_preload (const std: throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__); } - auto id = entry.get_id(); - m_value_to_id[str_value] = id; + m_value_to_id[str_value] = entry.get_id();; m_data_size += entry.get_data_size(); - entry.clear(); } m_next_id = num_dictionary_entries; diff --git a/components/core/src/LogTypeDictionaryWriter.cpp b/components/core/src/LogTypeDictionaryWriter.cpp index f2355d908..1960a1e46 100644 --- a/components/core/src/LogTypeDictionaryWriter.cpp +++ b/components/core/src/LogTypeDictionaryWriter.cpp @@ -6,7 +6,6 @@ using std::string; bool LogTypeDictionaryWriter::add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) { - bool is_new_entry = false; const string& value = logtype_entry.get_value(); diff --git a/components/core/src/VariableDictionaryWriter.hpp b/components/core/src/VariableDictionaryWriter.hpp index 3f62c61ab..329d1faf7 100644 --- a/components/core/src/VariableDictionaryWriter.hpp +++ b/components/core/src/VariableDictionaryWriter.hpp @@ -24,9 +24,9 @@ class VariableDictionaryWriter : public DictionaryWriter