Skip to content

Commit

Permalink
1.Remove uncommited entries from memory during compression. 2.Updated…
Browse files Browse the repository at this point in the history
… how ids_in_segment gets populated
  • Loading branch information
haiqi96 committed Nov 26, 2021
1 parent 1d9601f commit 9233883
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 99 deletions.
54 changes: 10 additions & 44 deletions components/core/src/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class DictionaryWriter {
const EntryType* get_entry (DictionaryIdType id) const;

/**
* Writes uncommitted dictionary entries to file
* Writes dict_type count and flush compressors to file
*/
void write_uncommitted_entries_to_disk ();
void write_dictionary_info_to_disk ();

/**
* Adds the given segment and IDs to the segment index
Expand All @@ -85,22 +85,19 @@ class DictionaryWriter {

protected:
// Types
typedef std::unordered_map<std::string, EntryType*> value_to_entry_t;
typedef std::unordered_map<DictionaryIdType, EntryType*> id_to_entry_t;
typedef std::unordered_map<std::string, DictionaryIdType> value_to_id_t;

// Variables
bool m_is_open;

// Variables related to on-disk storage
std::vector<EntryType*> m_uncommitted_entries;
FileWriter m_dictionary_file_writer;
streaming_compression::zstd::Compressor m_dictionary_compressor;
FileWriter m_segment_index_file_writer;
streaming_compression::zstd::Compressor m_segment_index_compressor;
size_t m_num_segments_in_index;

value_to_entry_t m_value_to_entry;
id_to_entry_t m_id_to_entry;
value_to_id_t m_value_to_id;
DictionaryIdType m_next_id;
DictionaryIdType m_max_id;

Expand All @@ -110,12 +107,7 @@ class DictionaryWriter {

template <typename DictionaryIdType, typename EntryType>
DictionaryWriter<DictionaryIdType, EntryType>::~DictionaryWriter () {
if (false == m_uncommitted_entries.empty()) {
SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss.");
}
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
// Do nothing
}

template <typename DictionaryIdType, typename EntryType>
Expand Down Expand Up @@ -151,60 +143,34 @@ void DictionaryWriter<DictionaryIdType, EntryType>::close () {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

write_uncommitted_entries_to_disk();
write_dictionary_info_to_disk();
m_segment_index_compressor.close();
m_segment_index_file_writer.close();
m_dictionary_compressor.close();
m_dictionary_file_writer.close();

// Delete entries and clear maps
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
m_id_to_entry.clear();
m_value_to_entry.clear();
// clear maps
m_value_to_id.clear();

m_is_open = false;
}

template <typename DictionaryIdType, typename EntryType>
const EntryType* DictionaryWriter<DictionaryIdType, EntryType>::get_entry (DictionaryIdType id) const {
void DictionaryWriter<DictionaryIdType, EntryType>::write_dictionary_info_to_disk () {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_id_to_entry.count(id) == 0) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
return m_id_to_entry.at(id);
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::write_uncommitted_entries_to_disk () {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_uncommitted_entries.empty()) {
// Nothing to do
return;
}

for (auto entry : m_uncommitted_entries) {
entry->write_to_file(m_dictionary_compressor);
}

// Update header
auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos();
m_dictionary_file_writer.seek_from_begin(0);
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_id_to_entry.size());
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_value_to_id.size());
m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos);

m_segment_index_compressor.flush();
m_segment_index_file_writer.flush();
m_dictionary_compressor.flush();
m_dictionary_file_writer.flush();
m_uncommitted_entries.clear();
}

template <typename DictionaryIdType, typename EntryType>
Expand Down
4 changes: 3 additions & 1 deletion components/core/src/EncodedVariableInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia
}

void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry,
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars)
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars,
vector<variable_dictionary_id_t>& added_vars)
{
// Extract all variables and add to dictionary while building logtype
size_t tok_begin_pos = 0;
Expand All @@ -225,6 +226,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes
variable_dictionary_id_t id;
var_dict.add_occurrence(var_str, id);
encoded_var = encode_var_dict_id(id);
added_vars.push_back(id);

logtype_dict_entry.add_non_double_var();
}
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/EncodedVariableInterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class EncodedVariableInterpreter {
* @param encoded_vars
*/
static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict,
std::vector<encoded_variable_t>& encoded_vars);
std::vector<encoded_variable_t>& encoded_vars, std::vector<variable_dictionary_id_t>& added_vars);
/**
* Decodes all variables and decompresses them into a message
* @param logtype_dict_entry
Expand Down
21 changes: 10 additions & 11 deletions components/core/src/LogTypeDictionaryWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa
logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor);
add_occurrence(logtype_dict_entry_wrapper, id);
}
m_uncommitted_entries.clear();

segment_index_decompressor.close();
segment_index_file_reader.close();
Expand All @@ -56,11 +55,10 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryE
bool is_new_entry = false;

const string& value = entry.get_value();
const auto ix = m_value_to_entry.find(value);
if (m_value_to_entry.end() != ix) {
// Entry exists so increment its count
auto& existing_entry = ix->second;
logtype_id = existing_entry->get_id();
const auto ix = m_value_to_id.find(value);
if (m_value_to_id.end() != ix) {
// Entry exists so get its ID
logtype_id = ix->second;
} else {
// Dictionary entry doesn't exist so create it

Expand Down Expand Up @@ -90,16 +88,17 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryE

// Insert new entry into dictionary
auto entry_ptr = entry_wrapper.release();
m_value_to_entry[value] = entry_ptr;
m_id_to_entry[logtype_id] = entry_ptr;

// Mark ID as dirty
m_uncommitted_entries.emplace_back(entry_ptr);
m_value_to_id[value] = logtype_id;

is_new_entry = true;

// TODO: This doesn't account for the segment index that's constantly updated
// Nor does it account that compression doesn't store the whole entry in the memory
m_data_size += entry_ptr->get_data_size();

// write the new entry to the compressor and free up the memory
entry_ptr->write_to_file(m_dictionary_compressor);
delete entry_ptr;
}
return is_new_entry;
}
18 changes: 9 additions & 9 deletions components/core/src/VariableDictionaryWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path,
var_dict_entry.read_from_file(dictionary_decompressor);
add_occurrence(var_dict_entry.get_value(), id);
}
m_uncommitted_entries.clear();

segment_index_decompressor.close();
segment_index_file_reader.close();
Expand All @@ -53,9 +52,9 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path,
bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) {
bool new_entry = false;

const auto ix = m_value_to_entry.find(value);
if (m_value_to_entry.end() != ix) {
id = ix->second->get_id();
const auto ix = m_value_to_id.find(value);
if (m_value_to_id.end() != ix) {
id = ix->second;
} else {
// Entry doesn't exist so create it

Expand All @@ -70,16 +69,17 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic

// Insert the ID obtained from the database into the dictionary
auto* entry = new VariableDictionaryEntry(value, id);
m_value_to_entry[value] = entry;
m_id_to_entry[id] = entry;

// Mark ID as dirty
m_uncommitted_entries.emplace_back(entry);
m_value_to_id[value] = id;

new_entry = true;

// TODO: This doesn't account for the segment index that's constantly updated
// Nor does it account that compression doesn't store the whole entry in the memory
m_data_size += entry->get_data_size();

// write the new entry to the compressor and free up the memory
entry->write_to_file(m_dictionary_compressor);
delete entry;
}
return new_entry;
}
14 changes: 8 additions & 6 deletions components/core/src/streaming_archive/writer/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,15 @@ namespace streaming_archive { namespace writer {

void Archive::write_msg (File& file, epochtime_t timestamp, const string& message, size_t num_uncompressed_bytes) {
vector<encoded_variable_t> encoded_vars;
EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars);
// ids of variable type in the message
vector<variable_dictionary_id_t> added_var_ids;
EncodedVariableInterpreter::encode_and_add_to_dictionary(message, *m_logtype_dict_entry_wrapper, m_var_dict, encoded_vars, added_var_ids);
logtype_dictionary_id_t logtype_id;
if (m_logtype_dict.add_occurrence(m_logtype_dict_entry_wrapper, logtype_id)) {
m_logtype_dict_entry_wrapper = make_unique<LogTypeDictionaryEntry>();
}

file.write_encoded_msg(timestamp, logtype_id, encoded_vars, num_uncompressed_bytes);
file.write_encoded_msg(timestamp, logtype_id, encoded_vars, added_var_ids, num_uncompressed_bytes);
}

void Archive::write_dir_snapshot () {
Expand All @@ -284,8 +286,8 @@ namespace streaming_archive { namespace writer {
#endif

// Flush dictionaries
m_logtype_dict.write_uncommitted_entries_to_disk();
m_var_dict.write_uncommitted_entries_to_disk();
m_logtype_dict.write_dictionary_info_to_disk();
m_var_dict.write_dictionary_info_to_disk();
}

void Archive::append_file_to_segment (File*& file, Segment& segment, unordered_set<logtype_dictionary_id_t>& logtype_ids_in_segment,
Expand Down Expand Up @@ -369,8 +371,8 @@ namespace streaming_archive { namespace writer {
#endif

// Flush dictionaries
m_logtype_dict.write_uncommitted_entries_to_disk();
m_var_dict.write_uncommitted_entries_to_disk();
m_logtype_dict.write_dictionary_info_to_disk();
m_var_dict.write_dictionary_info_to_disk();

for (auto file : files) {
file->mark_as_in_committed_segment();
Expand Down
42 changes: 18 additions & 24 deletions components/core/src/streaming_archive/writer/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace streaming_archive { namespace writer {
if (m_is_written_out) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}

m_variable_ids = new std::unordered_set<variable_dictionary_id_t>;
m_is_open = true;
}

Expand Down Expand Up @@ -44,15 +44,26 @@ namespace streaming_archive { namespace writer {
m_timestamps.clear();
m_logtypes.clear();
m_variables.clear();

// release the memory
m_variable_ids->clear();
delete m_variable_ids;
m_variable_ids = nullptr;
}

void File::write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector<encoded_variable_t>& encoded_vars,
size_t num_uncompressed_bytes)
const std::vector<variable_dictionary_id_t>& added_var_ids, size_t num_uncompressed_bytes)
{
m_timestamps.push_back(timestamp);
m_logtypes.push_back(logtype_id);
m_variables.push_back_all(encoded_vars);

// insert ids seen from log message to the file
// duplicate ids will be handled by unordered_set internally
for(const auto& id : added_var_ids){
m_variable_ids->emplace(id);
}

// Update metadata
++m_num_messages;
m_num_variables += encoded_vars.size();
Expand Down Expand Up @@ -117,31 +128,14 @@ namespace streaming_archive { namespace writer {
unordered_set<logtype_dictionary_id_t>& segment_logtype_ids,
unordered_set<variable_dictionary_id_t>& segment_var_ids)
{
size_t var_ix = 0;
// Add logtype to set
for (size_t i = 0; i < num_logtypes; ++i) {
// Add logtype to set
auto logtype_id = logtype_ids[i];
segment_logtype_ids.emplace(logtype_id);

// Get logtype dictionary entry
auto logtype_dict_entry_ptr = logtype_dict.get_entry(logtype_id);
auto& logtype_dict_entry = *logtype_dict_entry_ptr;

// Get number of variables in logtype
auto msg_num_vars = logtype_dict_entry.get_num_vars();
if (var_ix + msg_num_vars > num_vars) {
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}

// If variable is a variable dictionary ID, decode it and add it to the set
for (size_t msg_var_ix = 0; msg_var_ix < msg_num_vars; ++msg_var_ix, ++var_ix) {
if (LogTypeDictionaryEntry::VarDelim::NonDouble == logtype_dict_entry.get_var_delim(msg_var_ix)) {
auto var = vars[var_ix];
if (EncodedVariableInterpreter::is_var_dict_id(var)) {
segment_var_ids.emplace(EncodedVariableInterpreter::decode_var_dict_id(var));
}
}
}
}
// Add vartype to set
for(const variable_dictionary_id_t& id : *m_variable_ids){
segment_var_ids.emplace(id);
}
}

Expand Down
8 changes: 5 additions & 3 deletions components/core/src/streaming_archive/writer/File.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ namespace streaming_archive { namespace writer {
m_segmentation_state(SegmentationState_NotInSegment),
m_is_metadata_clean(false),
m_is_written_out(false),
m_is_open(false)
m_is_open(false),
m_variable_ids(nullptr)
{}

// Destructor
Expand All @@ -84,7 +85,7 @@ namespace streaming_archive { namespace writer {
* @param num_uncompressed_bytes
*/
void write_encoded_msg (epochtime_t timestamp, logtype_dictionary_id_t logtype_id, const std::vector<encoded_variable_t>& encoded_vars,
size_t num_uncompressed_bytes);
const std::vector<variable_dictionary_id_t>& added_vars_ids, size_t num_uncompressed_bytes);

/**
* Changes timestamp pattern in use at current message in file
Expand Down Expand Up @@ -182,7 +183,7 @@ namespace streaming_archive { namespace writer {
* @param segment_logtype_ids
* @param segment_var_ids
*/
static void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids,
void append_logtype_and_var_ids_to_segment_sets (const LogTypeDictionaryWriter& logtype_dict, const logtype_dictionary_id_t* logtype_ids,
size_t num_logtypes, const encoded_variable_t* vars, size_t num_vars,
std::unordered_set<logtype_dictionary_id_t>& segment_logtype_ids,
std::unordered_set<variable_dictionary_id_t>& segment_var_ids);
Expand Down Expand Up @@ -229,6 +230,7 @@ namespace streaming_archive { namespace writer {
PageAllocatedVector<epochtime_t> m_timestamps;
PageAllocatedVector<logtype_dictionary_id_t> m_logtypes;
PageAllocatedVector<encoded_variable_t> m_variables;
std::unordered_set<variable_dictionary_id_t> * m_variable_ids;

// State variables
SegmentationState m_segmentation_state;
Expand Down

0 comments on commit 9233883

Please sign in to comment.