Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.Remove uncommited entries from memory during compression. 2.Updated how ids_in_segment gets populated #40

Merged
merged 13 commits into from
Nov 28, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 42 additions & 53 deletions components/core/src/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "FileWriter.hpp"
#include "streaming_compression/zstd/Compressor.hpp"
#include "TraceableException.hpp"
#include "dictionary_utils.hpp"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Template class for performing operations on dictionaries and writing them to disk
Expand All @@ -38,7 +39,7 @@ class DictionaryWriter {
// Constructors
DictionaryWriter () : m_is_open(false) {}

~DictionaryWriter ();
~DictionaryWriter () = default;

// Methods
/**
Expand All @@ -53,16 +54,18 @@ class DictionaryWriter {
void close ();

/**
* Gets the entry with the given ID
* @param id
* @return Pointer to the entry with the given ID
* Writes the dictionary's header and flushes unwritten content to disk
*/
const EntryType* get_entry (DictionaryIdType id) const;
void write_header_and_flush_to_disk ();

/**
* Writes uncommitted dictionary entries to file
* Preload dictionary and populate the hash map with str values
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
* From the input dictionary
* @param dictionary_decompressor
* @param dictionary_file_reader
*/
void write_uncommitted_entries_to_disk ();
void preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor,
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
FileReader& dictionary_file_reader);

/**
* Adds the given segment and IDs to the segment index
Expand All @@ -85,39 +88,26 @@ class DictionaryWriter {

protected:
// Types
typedef std::unordered_map<std::string, EntryType*> value_to_entry_t;
typedef std::unordered_map<DictionaryIdType, EntryType*> id_to_entry_t;
typedef std::unordered_map<std::string, DictionaryIdType> value_to_id_t;

// Variables
bool m_is_open;

// Variables related to on-disk storage
std::vector<EntryType*> m_uncommitted_entries;
FileWriter m_dictionary_file_writer;
streaming_compression::zstd::Compressor m_dictionary_compressor;
FileWriter m_segment_index_file_writer;
streaming_compression::zstd::Compressor m_segment_index_compressor;
size_t m_num_segments_in_index;

value_to_entry_t m_value_to_entry;
id_to_entry_t m_id_to_entry;
value_to_id_t m_value_to_id;
DictionaryIdType m_next_id;
DictionaryIdType m_max_id;

// Size (in-memory) of the data contained in the dictionary
size_t m_data_size;
};

template <typename DictionaryIdType, typename EntryType>
DictionaryWriter<DictionaryIdType, EntryType>::~DictionaryWriter () {
if (false == m_uncommitted_entries.empty()) {
SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss.");
}
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::open (const std::string& dictionary_path, const std::string& segment_index_path, DictionaryIdType max_id) {
if (m_is_open) {
Expand Down Expand Up @@ -151,60 +141,59 @@ void DictionaryWriter<DictionaryIdType, EntryType>::close () {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

write_uncommitted_entries_to_disk();
write_header_and_flush_to_disk();
m_segment_index_compressor.close();
m_segment_index_file_writer.close();
m_dictionary_compressor.close();
m_dictionary_file_writer.close();

// Delete entries and clear maps
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
m_id_to_entry.clear();
m_value_to_entry.clear();
m_value_to_id.clear();

m_is_open = false;
}

template <typename DictionaryIdType, typename EntryType>
const EntryType* DictionaryWriter<DictionaryIdType, EntryType>::get_entry (DictionaryIdType id) const {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_id_to_entry.count(id) == 0) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
return m_id_to_entry.at(id);
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::write_uncommitted_entries_to_disk () {
void DictionaryWriter<DictionaryIdType, EntryType>::write_header_and_flush_to_disk () {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_uncommitted_entries.empty()) {
// Nothing to do
return;
}

for (auto entry : m_uncommitted_entries) {
entry->write_to_file(m_dictionary_compressor);
}

// Update header
auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos();
m_dictionary_file_writer.seek_from_begin(0);
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_id_to_entry.size());
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_value_to_id.size());
m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos);

m_segment_index_compressor.flush();
m_segment_index_file_writer.flush();
m_dictionary_compressor.flush();
m_dictionary_file_writer.flush();
m_uncommitted_entries.clear();
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::preload_dictionary_value_to_map (streaming_compression::zstd::Decompressor& dictionary_decompressor,
FileReader& dictionary_file_reader) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);
EntryType entry;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
entry.read_from_file(dictionary_decompressor);
const auto& str_value = entry.get_value();
if (m_value_to_id.find(str_value) != m_value_to_id.end()) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
SPDLOG_ERROR("entry value already exists");
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}
if (m_next_id > m_max_id) {
SPDLOG_ERROR("DictionaryWriter ran out of IDs.");
throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__);
}

DictionaryIdType id = m_next_id;
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
m_next_id++;
m_value_to_id[str_value] = id;

kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
entry.clear();
}
}

template <typename DictionaryIdType, typename EntryType>
Expand Down
5 changes: 3 additions & 2 deletions components/core/src/EncodedVariableInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia
}

void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry,
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars)
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars,
vector<variable_dictionary_id_t>& var_ids)
{
// Extract all variables and add to dictionary while building logtype
size_t tok_begin_pos = 0;
size_t next_delim_pos = 0;
size_t last_var_end_pos = 0;
string var_str;
logtype_dict_entry.clear();
// To avoid reallocating the logtype as we append to it, reserve enough space to hold the entire message
logtype_dict_entry.reserve_constant_length(message.length());
while (logtype_dict_entry.parse_next_var(message, tok_begin_pos, next_delim_pos, last_var_end_pos, var_str)) {
Expand All @@ -225,6 +225,7 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes
variable_dictionary_id_t id;
var_dict.add_occurrence(var_str, id);
encoded_var = encode_var_dict_id(id);
var_ids.push_back(id);

logtype_dict_entry.add_non_double_var();
}
Expand Down
3 changes: 2 additions & 1 deletion components/core/src/EncodedVariableInterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class EncodedVariableInterpreter {
* @param logtype_dict_entry
* @param var_dict
* @param encoded_vars
* @param var_ids
*/
static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict,
std::vector<encoded_variable_t>& encoded_vars);
std::vector<encoded_variable_t>& encoded_vars, std::vector<variable_dictionary_id_t>& var_ids);
/**
* Decodes all variables and decompresses them into a message
* @param logtype_dict_entry
Expand Down
42 changes: 14 additions & 28 deletions components/core/src/LogTypeDictionaryWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,7 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa
open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor,
segment_index_file_reader, segment_index_decompressor);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);

// Read new dictionary entries
logtype_dictionary_id_t id;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
auto logtype_dict_entry_wrapper = std::make_unique<LogTypeDictionaryEntry>();
logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor);
add_occurrence(logtype_dict_entry_wrapper, id);
}
m_uncommitted_entries.clear();
preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader);
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

segment_index_decompressor.close();
segment_index_file_reader.close();
Expand All @@ -47,20 +38,18 @@ void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_pa
m_is_open = true;
}

bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryEntry>& entry_wrapper, logtype_dictionary_id_t& logtype_id) {
if (nullptr == entry_wrapper) {
bool LogTypeDictionaryWriter::add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) {
if (logtype_entry.get_value().empty()) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
auto& entry = *entry_wrapper;

bool is_new_entry = false;

const string& value = entry.get_value();
const auto ix = m_value_to_entry.find(value);
if (m_value_to_entry.end() != ix) {
// Entry exists so increment its count
auto& existing_entry = ix->second;
logtype_id = existing_entry->get_id();
const string& value = logtype_entry.get_value();
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
const auto ix = m_value_to_id.find(value);
if (m_value_to_id.end() != ix) {
// Entry exists so get its ID
logtype_id = ix->second;
} else {
// Dictionary entry doesn't exist so create it

Expand All @@ -81,25 +70,22 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryE
} else {
verbosity = LogVerbosity_UNKNOWN;
}
entry.set_verbosity(verbosity);
logtype_entry.set_verbosity(verbosity);

// Assign ID
logtype_id = m_next_id;
++m_next_id;
entry.set_id(logtype_id);
logtype_entry.set_id(logtype_id);

// Insert new entry into dictionary
auto entry_ptr = entry_wrapper.release();
m_value_to_entry[value] = entry_ptr;
m_id_to_entry[logtype_id] = entry_ptr;

// Mark ID as dirty
m_uncommitted_entries.emplace_back(entry_ptr);
m_value_to_id[value] = logtype_id;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

is_new_entry = true;

// TODO: This doesn't account for the segment index that's constantly updated
m_data_size += entry_ptr->get_data_size();
m_data_size += logtype_entry.get_data_size();

logtype_entry.write_to_file(m_dictionary_compressor);
}
return is_new_entry;
}
4 changes: 2 additions & 2 deletions components/core/src/LogTypeDictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class LogTypeDictionaryWriter : public DictionaryWriter<logtype_dictionary_id_t,
/**
* Adds an entry to the dictionary if it doesn't exist, or increases its occurrence count if it does. If the entry does not exist, the entry pointer is
* released from entry_wrapper and stored in the dictionary.
* @param entry_wrapper
* @param logtype_entry
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
* @param logtype_id ID of the logtype matching the given entry
*/
bool add_occurrence (std::unique_ptr<LogTypeDictionaryEntry>& entry_wrapper, logtype_dictionary_id_t& logtype_id);
bool add_occurrence (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id);
};

#endif // LOGTYPEDICTIONARYWRITER_HPP
4 changes: 4 additions & 0 deletions components/core/src/VariableDictionaryEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ size_t VariableDictionaryEntry::get_data_size () const {
return sizeof(m_id) + m_value.length() + m_ids_of_segments_containing_entry.size() * sizeof(segment_id_t);
}

void VariableDictionaryEntry::clear () {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
m_value.clear();
}

void VariableDictionaryEntry::write_to_file (streaming_compression::zstd::Compressor& compressor) const {
compressor.write_numeric_value(m_id);
compressor.write_numeric_value<uint64_t>(m_value.length());
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/VariableDictionaryEntry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class VariableDictionaryEntry : public DictionaryEntry<variable_dictionary_id_t>
*/
size_t get_data_size () const;

void clear ();

/**
* Writes an entry to file
* @param compressor
Expand Down
29 changes: 9 additions & 20 deletions components/core/src/VariableDictionaryWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,7 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path,
open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor,
segment_index_file_reader, segment_index_decompressor);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);

// Read new dictionary entries
variable_dictionary_id_t id;
VariableDictionaryEntry var_dict_entry;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
var_dict_entry.read_from_file(dictionary_decompressor);
add_occurrence(var_dict_entry.get_value(), id);
}
m_uncommitted_entries.clear();
preload_dictionary_value_to_map(dictionary_decompressor, dictionary_file_reader);

segment_index_decompressor.close();
segment_index_file_reader.close();
Expand All @@ -53,9 +44,9 @@ void VariableDictionaryWriter::open_and_preload (const string& dictionary_path,
bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dictionary_id_t& id) {
bool new_entry = false;

const auto ix = m_value_to_entry.find(value);
if (m_value_to_entry.end() != ix) {
id = ix->second->get_id();
const auto ix = m_value_to_id.find(value);
if (m_value_to_id.end() != ix) {
id = ix->second;
} else {
// Entry doesn't exist so create it

Expand All @@ -69,17 +60,15 @@ bool VariableDictionaryWriter::add_occurrence (const string& value, variable_dic
++m_next_id;

// Insert the ID obtained from the database into the dictionary
auto* entry = new VariableDictionaryEntry(value, id);
m_value_to_entry[value] = entry;
m_id_to_entry[id] = entry;

// Mark ID as dirty
m_uncommitted_entries.emplace_back(entry);
auto entry = VariableDictionaryEntry(value, id);
m_value_to_id[value] = id;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

new_entry = true;

// TODO: This doesn't account for the segment index that's constantly updated
m_data_size += entry->get_data_size();
m_data_size += entry.get_data_size();

entry.write_to_file(m_dictionary_compressor);
}
return new_entry;
}
Loading