Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.Remove uncommited entries from memory during compression. 2.Updated how ids_in_segment gets populated #40

Merged
merged 13 commits into from
Nov 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 71 additions & 52 deletions components/core/src/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define DICTIONARYWRITER_HPP

// C++ standard libraries
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
Expand All @@ -11,6 +12,7 @@

// Project headers
#include "Defs.h"
#include "dictionary_utils.hpp"
#include "FileWriter.hpp"
#include "streaming_compression/zstd/Compressor.hpp"
#include "TraceableException.hpp"
Expand Down Expand Up @@ -38,7 +40,7 @@ class DictionaryWriter {
// Constructors
DictionaryWriter () : m_is_open(false) {}

~DictionaryWriter ();
~DictionaryWriter () = default;

// Methods
/**
Expand All @@ -53,16 +55,17 @@ class DictionaryWriter {
void close ();

/**
* Gets the entry with the given ID
* @param id
* @return Pointer to the entry with the given ID
* Writes the dictionary's header and flushes unwritten content to disk
*/
const EntryType* get_entry (DictionaryIdType id) const;
void write_header_and_flush_to_disk ();

/**
* Writes uncommitted dictionary entries to file
* Opens dictionary, loads entries, and then sets it up for writing
* @param dictionary_path
* @param segment_index_path
* @param max_id
*/
void write_uncommitted_entries_to_disk ();
void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, variable_dictionary_id_t max_id);

/**
* Adds the given segment and IDs to the segment index
Expand All @@ -85,39 +88,26 @@ class DictionaryWriter {

protected:
// Types
typedef std::unordered_map<std::string, EntryType*> value_to_entry_t;
typedef std::unordered_map<DictionaryIdType, EntryType*> id_to_entry_t;
typedef std::unordered_map<std::string, DictionaryIdType> value_to_id_t;

// Variables
bool m_is_open;

// Variables related to on-disk storage
std::vector<EntryType*> m_uncommitted_entries;
FileWriter m_dictionary_file_writer;
streaming_compression::zstd::Compressor m_dictionary_compressor;
FileWriter m_segment_index_file_writer;
streaming_compression::zstd::Compressor m_segment_index_compressor;
size_t m_num_segments_in_index;

value_to_entry_t m_value_to_entry;
id_to_entry_t m_id_to_entry;
value_to_id_t m_value_to_id;
DictionaryIdType m_next_id;
DictionaryIdType m_max_id;

// Size (in-memory) of the data contained in the dictionary
size_t m_data_size;
};

template <typename DictionaryIdType, typename EntryType>
DictionaryWriter<DictionaryIdType, EntryType>::~DictionaryWriter () {
if (false == m_uncommitted_entries.empty()) {
SPDLOG_ERROR("DictionaryWriter contains uncommitted entries while being destroyed - possible data loss.");
}
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::open (const std::string& dictionary_path, const std::string& segment_index_path, DictionaryIdType max_id) {
if (m_is_open) {
Expand Down Expand Up @@ -151,60 +141,89 @@ void DictionaryWriter<DictionaryIdType, EntryType>::close () {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

write_uncommitted_entries_to_disk();
write_header_and_flush_to_disk();
m_segment_index_compressor.close();
m_segment_index_file_writer.close();
m_dictionary_compressor.close();
m_dictionary_file_writer.close();

// Delete entries and clear maps
for (const auto& value_entry_pair : m_value_to_entry) {
delete value_entry_pair.second;
}
m_id_to_entry.clear();
m_value_to_entry.clear();
m_value_to_id.clear();

m_is_open = false;
}

template <typename DictionaryIdType, typename EntryType>
const EntryType* DictionaryWriter<DictionaryIdType, EntryType>::get_entry (DictionaryIdType id) const {
void DictionaryWriter<DictionaryIdType, EntryType>::write_header_and_flush_to_disk () {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

if (m_id_to_entry.count(id) == 0) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
return m_id_to_entry.at(id);
// Update header
auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos();
m_dictionary_file_writer.seek_from_begin(0);
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_value_to_id.size());
m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos);

m_segment_index_compressor.flush();
m_segment_index_file_writer.flush();
m_dictionary_compressor.flush();
m_dictionary_file_writer.flush();
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::write_uncommitted_entries_to_disk () {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
void DictionaryWriter<DictionaryIdType, EntryType>::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path,
const variable_dictionary_id_t max_id)
{
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (m_uncommitted_entries.empty()) {
// Nothing to do
return;
m_max_id = max_id;

FileReader dictionary_file_reader;
streaming_compression::zstd::Decompressor dictionary_decompressor;
FileReader segment_index_file_reader;
streaming_compression::zstd::Decompressor segment_index_decompressor;
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor,
segment_index_file_reader, segment_index_decompressor);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);
if (num_dictionary_entries > m_max_id) {
SPDLOG_ERROR("DictionaryWriter ran out of IDs.");
throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__);
}
// Loads entries from the given dictionary file
EntryType entry;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
entry.clear();
entry.read_from_file(dictionary_decompressor);
const auto& str_value = entry.get_value();
if (m_value_to_id.count(str_value)) {
SPDLOG_ERROR("Entry's value already exists in dictionary");
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}

for (auto entry : m_uncommitted_entries) {
entry->write_to_file(m_dictionary_compressor);
m_value_to_id[str_value] = entry.get_id();;
m_data_size += entry.get_data_size();
}

// Update header
auto dictionary_file_writer_pos = m_dictionary_file_writer.get_pos();
m_dictionary_file_writer.seek_from_begin(0);
m_dictionary_file_writer.write_numeric_value<uint64_t>(m_id_to_entry.size());
m_dictionary_file_writer.seek_from_begin(dictionary_file_writer_pos);
m_next_id = num_dictionary_entries;

m_segment_index_compressor.flush();
m_segment_index_file_writer.flush();
m_dictionary_compressor.flush();
m_dictionary_file_writer.flush();
m_uncommitted_entries.clear();
segment_index_decompressor.close();
segment_index_file_reader.close();
dictionary_decompressor.close();
dictionary_file_reader.close();

m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING);
// Open compressor
m_dictionary_compressor.open(m_dictionary_file_writer);

m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING);
// Open compressor
m_segment_index_compressor.open(m_segment_index_file_writer);

m_is_open = true;
}

template <typename DictionaryIdType, typename EntryType>
Expand Down
6 changes: 4 additions & 2 deletions components/core/src/EncodedVariableInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ void EncodedVariableInterpreter::convert_encoded_double_to_string (encoded_varia
}

void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& message, LogTypeDictionaryEntry& logtype_dict_entry,
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars)
VariableDictionaryWriter& var_dict, vector<encoded_variable_t>& encoded_vars,
vector<variable_dictionary_id_t>& var_ids)
{
// Extract all variables and add to dictionary while building logtype
size_t tok_begin_pos = 0;
Expand All @@ -223,8 +224,9 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes
} else {
// Variable string looks like a dictionary variable, so encode it as so
variable_dictionary_id_t id;
var_dict.add_occurrence(var_str, id);
var_dict.add_entry(var_str, id);
encoded_var = encode_var_dict_id(id);
var_ids.push_back(id);

logtype_dict_entry.add_non_double_var();
}
Expand Down
3 changes: 2 additions & 1 deletion components/core/src/EncodedVariableInterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class EncodedVariableInterpreter {
* @param logtype_dict_entry
* @param var_dict
* @param encoded_vars
* @param var_ids
*/
static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict,
std::vector<encoded_variable_t>& encoded_vars);
std::vector<encoded_variable_t>& encoded_vars, std::vector<variable_dictionary_id_t>& var_ids);
/**
* Decodes all variables and decompresses them into a message
* @param logtype_dict_entry
Expand Down
78 changes: 15 additions & 63 deletions components/core/src/LogTypeDictionaryWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,17 @@

using std::string;

void LogTypeDictionaryWriter::open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, logtype_dictionary_id_t max_id) {
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_max_id = max_id;

FileReader dictionary_file_reader;
streaming_compression::zstd::Decompressor dictionary_decompressor;
FileReader segment_index_file_reader;
streaming_compression::zstd::Decompressor segment_index_decompressor;
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
open_dictionary_for_reading(dictionary_path, segment_index_path, cDecompressorFileReadBufferCapacity, dictionary_file_reader, dictionary_decompressor,
segment_index_file_reader, segment_index_decompressor);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);

// Read new dictionary entries
logtype_dictionary_id_t id;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
auto logtype_dict_entry_wrapper = std::make_unique<LogTypeDictionaryEntry>();
logtype_dict_entry_wrapper->read_from_file(dictionary_decompressor);
add_occurrence(logtype_dict_entry_wrapper, id);
}
m_uncommitted_entries.clear();

segment_index_decompressor.close();
segment_index_file_reader.close();
dictionary_decompressor.close();
dictionary_file_reader.close();

m_dictionary_file_writer.open(dictionary_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING);
// Open compressor
m_dictionary_compressor.open(m_dictionary_file_writer);

m_segment_index_file_writer.open(segment_index_path, FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING);
// Open compressor
m_segment_index_compressor.open(m_segment_index_file_writer);

m_is_open = true;
}

bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryEntry>& entry_wrapper, logtype_dictionary_id_t& logtype_id) {
if (nullptr == entry_wrapper) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
auto& entry = *entry_wrapper;

bool LogTypeDictionaryWriter::add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id) {
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
bool is_new_entry = false;

const string& value = entry.get_value();
const auto ix = m_value_to_entry.find(value);
if (m_value_to_entry.end() != ix) {
// Entry exists so increment its count
auto& existing_entry = ix->second;
logtype_id = existing_entry->get_id();
const string& value = logtype_entry.get_value();
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
if (value.empty()) {
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}
const auto ix = m_value_to_id.find(value);
if (m_value_to_id.end() != ix) {
// Entry exists so get its ID
logtype_id = ix->second;
} else {
// Dictionary entry doesn't exist so create it

Expand All @@ -81,25 +36,22 @@ bool LogTypeDictionaryWriter::add_occurrence (std::unique_ptr<LogTypeDictionaryE
} else {
verbosity = LogVerbosity_UNKNOWN;
}
entry.set_verbosity(verbosity);
logtype_entry.set_verbosity(verbosity);

// Assign ID
logtype_id = m_next_id;
++m_next_id;
entry.set_id(logtype_id);
logtype_entry.set_id(logtype_id);

// Insert new entry into dictionary
auto entry_ptr = entry_wrapper.release();
m_value_to_entry[value] = entry_ptr;
m_id_to_entry[logtype_id] = entry_ptr;

// Mark ID as dirty
m_uncommitted_entries.emplace_back(entry_ptr);
m_value_to_id[value] = logtype_id;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

is_new_entry = true;

// TODO: This doesn't account for the segment index that's constantly updated
m_data_size += entry_ptr->get_data_size();
m_data_size += logtype_entry.get_data_size();

logtype_entry.write_to_file(m_dictionary_compressor);
}
return is_new_entry;
}
15 changes: 3 additions & 12 deletions components/core/src/LogTypeDictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,11 @@ class LogTypeDictionaryWriter : public DictionaryWriter<logtype_dictionary_id_t,

// Methods
/**
* Opens dictionary, loads entries, and then sets it up for writing
* @param dictionary_path
* @param segment_index_path
* @param max_id
*/
void open_and_preload (const std::string& dictionary_path, const std::string& segment_index_path, logtype_dictionary_id_t max_id);

/**
* Adds an entry to the dictionary if it doesn't exist, or increases its occurrence count if it does. If the entry does not exist, the entry pointer is
* released from entry_wrapper and stored in the dictionary.
* @param entry_wrapper
* Adds the given entry to the dictionary if it doesn't exist
* @param logtype_entry
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
* @param logtype_id ID of the logtype matching the given entry
*/
bool add_occurrence (std::unique_ptr<LogTypeDictionaryEntry>& entry_wrapper, logtype_dictionary_id_t& logtype_id);
bool add_entry (LogTypeDictionaryEntry& logtype_entry, logtype_dictionary_id_t& logtype_id);
};

#endif // LOGTYPEDICTIONARYWRITER_HPP
2 changes: 2 additions & 0 deletions components/core/src/VariableDictionaryEntry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class VariableDictionaryEntry : public DictionaryEntry<variable_dictionary_id_t>
*/
size_t get_data_size () const;

void clear () { m_value.clear(); }

/**
* Writes an entry to file
* @param compressor
Expand Down
Loading