Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ArrayBackedSet to replace std::set for index in segment #47

Merged
merged 41 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fa435ab
1.Remove uncommited entries from memory during compression. 2.Updated…
haiqi96 Nov 26, 2021
c34dcfa
Updated unit-test
haiqi96 Nov 26, 2021
667251e
Fix a mistake in the test and added a new variable string that will b…
haiqi96 Nov 26, 2021
6a74d3c
address some issues that are easier to fix in the pull request review
haiqi96 Nov 27, 2021
611c992
fix a variable name mismatch that was not caught by compiler and remo…
haiqi96 Nov 27, 2021
5dd2954
remove unused arguments from function
haiqi96 Nov 27, 2021
5f12a10
replace dynamically created entry with static ones. updated the open_…
haiqi96 Nov 27, 2021
2d4cbf1
better function refactor
haiqi96 Nov 27, 2021
c21ac31
use unique_ptr for dynamically allocated set. replace for loop with d…
haiqi96 Nov 27, 2021
366a463
address some issues in code review
haiqi96 Nov 28, 2021
9f97f79
Move LogEntry off from stack to avoid unnecessary allocation and dest…
haiqi96 Nov 28, 2021
07808cd
renamed add_occurrence function and slightly modified how next_id is …
haiqi96 Nov 28, 2021
17d256b
Fixed a few small places for cleaner code
haiqi96 Nov 28, 2021
0c6b202
Merge branch 'y-scope:main' into main
haiqi96 Dec 3, 2021
a96ff48
Merge branch 'y-scope:main' into main
haiqi96 Dec 29, 2021
66b9a6a
Merge branch 'y-scope:main' into main
haiqi96 Jan 11, 2022
1ba3d32
port changes over
haiqi96 Jan 11, 2022
e2d65ba
reorganize as template
haiqi96 Jan 11, 2022
60d0d28
add specific function for write to compressor
haiqi96 Jan 11, 2022
52abda1
rename the class, further cleaned up unused functions
haiqi96 Jan 11, 2022
7d6fc76
revert unintended changes
haiqi96 Jan 11, 2022
ac21336
revert unintended changes
haiqi96 Jan 11, 2022
6d10463
add CMakefile
haiqi96 Jan 11, 2022
50a8373
backip
haiqi96 Jan 13, 2022
e288963
fix
haiqi96 Jan 14, 2022
6969c3f
combine open and create file, and some other clean up
haiqi96 Jan 14, 2022
ae7322b
rename one member variable and add more comments to the newly created…
haiqi96 Jan 15, 2022
25a3292
further clean up comment
haiqi96 Jan 15, 2022
e3d7a49
rename the new data sturcture, update function description in the hea…
haiqi96 Jan 17, 2022
80963a4
adjust the order of header inclusion and constant define. removed one…
haiqi96 Jan 17, 2022
ed3c85c
simplified unnecessarily complicated sorting logic
haiqi96 Jan 17, 2022
50a693f
fix
haiqi96 Jan 17, 2022
d8da846
fix various comment issue
haiqi96 Jan 20, 2022
b148bee
Add more fixes
haiqi96 Jan 20, 2022
e73d4b2
fix nit and rename some functions
haiqi96 Jan 20, 2022
7baf5bd
fix issues in the comment
haiqi96 Jan 22, 2022
4887b72
more fixes
haiqi96 Jan 23, 2022
a44e11b
revert one const auto change
haiqi96 Jan 23, 2022
e631739
fix nit
haiqi96 Jan 23, 2022
b6b7157
add nullptr check and fix captitalization
haiqi96 Jan 23, 2022
fa99509
capitalization
haiqi96 Jan 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ include(cmake/Modules/FindLibraryDependencies.cmake)
FindDynamicLibraryDependencies(sqlite "${sqlite_DYNAMIC_LIBS}")

set(SOURCE_FILES_clp
src/ArrayBackedPosIntSet.cpp
src/ArrayBackedPosIntSet.hpp
src/clp/clp.cpp
src/clp/CommandLineArguments.cpp
src/clp/CommandLineArguments.hpp
Expand Down
1 change: 1 addition & 0 deletions components/core/src/ArrayBackedPosIntSet.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "ArrayBackedPosIntSet.hpp"
199 changes: 199 additions & 0 deletions components/core/src/ArrayBackedPosIntSet.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#ifndef ARRAYBACKEDPOSINTSET_HPP
#define ARRAYBACKEDPOSINTSET_HPP

// C++ standard libraries
#include <unordered_set>
#include <vector>

kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
// spdlog
#include <spdlog/spdlog.h>

// Project headers
#include "Defs.h"
#include "streaming_compression/zstd/Compressor.hpp"
#include "TraceableException.hpp"

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Template class of set implemented with vector<bool> for continuously increasing numeric value
* @tparam PosIntType
*/
template<typename PosIntType>
class ArrayBackedPosIntSet {
public:
// Types
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed (ErrorCode error_code, const char* const filename, int line_number) : TraceableException(error_code, filename, line_number) {}

// Methods
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
const char* what () const noexcept override {
return "ArrayBackedPosIntSet operation failed";
}
};

// Constructors
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
ArrayBackedPosIntSet ();

explicit ArrayBackedPosIntSet (size_t initial_capacity);

// Methods
/**
* Gets the number of unique values in the set
*/
size_t size () const { return m_size; }

/**
* Clears the set and restores its initial capacity
*/
void clear ();

void insert (PosIntType value);

/**
* Inserts all values from the given set
* @param input_set
*/
void insert_all (const ArrayBackedPosIntSet<PosIntType>& input_set);

/**
* Inserts all values from the given set
* @param input_set
*/
void insert_all (const std::unordered_set<PosIntType>& input_set);

/**
* Inserts all values from the given vector
* @param input_vector
*/
void insert_all (const std::vector<PosIntType>& input_vector);

/**
* Writes all values in the set into the given compressor
* @param compressor
*/
void write_to_compressor (streaming_compression::Compressor& compressor) const;

private:
// Methods
/**
* Increases the capacity of the bool array so that
* the given value becomes a valid index in the array
* @param value
*/
void increase_capacity (size_t value);

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
// Variables
std::vector<bool> m_data;
size_t m_initial_capacity;

// The number of unique values in the set
size_t m_size;

// The largest value in the set
PosIntType m_largest_value;
};

template<typename PosIntType>
ArrayBackedPosIntSet<PosIntType>::ArrayBackedPosIntSet () {
constexpr size_t cDefaultInitialCapacity = 1024;
m_initial_capacity = cDefaultInitialCapacity;
clear();
}

template<typename PosIntType>
ArrayBackedPosIntSet<PosIntType>::ArrayBackedPosIntSet (size_t initial_capacity) {
m_initial_capacity = initial_capacity;
clear();
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::clear () {
m_data.clear();
m_data.resize(m_initial_capacity, false);
m_size = 0;
m_largest_value = 0;
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::insert (PosIntType value) {
if (value >= m_data.size()) {
increase_capacity(value);
}

// Add the value if it is not already in the set
if (false == m_data[value]) {
m_data[value] = true;
m_size++;

// Update the largest value if necessary
if (value > m_largest_value) {
m_largest_value = value;
}
}
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::insert_all (const ArrayBackedPosIntSet<PosIntType>& input_set) {
// Increase capacity if necessary
size_t input_set_largest_value = input_set.m_largest_value;
if (input_set_largest_value >= m_data.size()) {
increase_capacity(input_set_largest_value);
}
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

// Copy values from the input set
auto input_set_data = input_set.m_data;
for (auto value = 0; value <= input_set_largest_value; ++value) {
// Add a value only if
// - doesn't exist in this set
// - exists in the input set
if (false == m_data[value] && input_set_data[value]) {
m_data[value] = true;
m_size++;
}
}

// Update the largest value if necessary
if (input_set_largest_value > m_largest_value) {
m_largest_value = input_set_largest_value;
}
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::insert_all (const std::unordered_set<PosIntType>& input_set) {
for (const auto value : input_set) {
insert(value);
}
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::insert_all (const std::vector<PosIntType>& input_vector) {
for (const auto value : input_vector) {
insert(value);
}
}

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::write_to_compressor (streaming_compression::Compressor& compressor) const {
for (PosIntType value = 0; value <= m_largest_value; ++value) {
if (m_data[value]) {
compressor.write_numeric_value(value);
}
}
}

template<typename PosIntType>
void ArrayBackedPosIntSet<PosIntType>::increase_capacity (size_t value) {
if (value < m_data.size()) {
SPDLOG_ERROR("Calling increase_capacity on value smaller than capacity.");
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
auto capacity = m_data.size();
do {
capacity += capacity / 2;
} while (capacity <= value);

m_data.resize(capacity, false);
}

#endif //ARRAYBACKEDPOSINTSET_HPP
9 changes: 4 additions & 5 deletions components/core/src/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <spdlog/spdlog.h>

// Project headers
#include "ArrayBackedPosIntSet.hpp"
#include "Defs.h"
#include "dictionary_utils.hpp"
#include "FileWriter.hpp"
Expand Down Expand Up @@ -72,7 +73,7 @@ class DictionaryWriter {
* @param segment_id
* @param ids
*/
void index_segment (segment_id_t segment_id, const std::unordered_set<DictionaryIdType>& ids);
void index_segment (segment_id_t segment_id, const ArrayBackedPosIntSet<DictionaryIdType>& ids);

/**
* Gets the size of the dictionary when it is stored on disk
Expand Down Expand Up @@ -227,7 +228,7 @@ void DictionaryWriter<DictionaryIdType, EntryType>::open_and_preload (const std:
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment (segment_id_t segment_id, const std::unordered_set<DictionaryIdType>& ids) {
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment (segment_id_t segment_id, const ArrayBackedPosIntSet<DictionaryIdType>& ids) {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
Expand All @@ -236,9 +237,7 @@ void DictionaryWriter<DictionaryIdType, EntryType>::index_segment (segment_id_t

// NOTE: The IDs in `ids` are not validated to exist in this dictionary since we perform validation when loading the dictionary.
m_segment_index_compressor.write_numeric_value<uint64_t>(ids.size());
for (auto id : ids) {
m_segment_index_compressor.write_numeric_value(id);
}
ids.write_to_compressor(m_segment_index_compressor);

++m_num_segments_in_index;

Expand Down
28 changes: 14 additions & 14 deletions components/core/src/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static void compute_and_add_empty_directories (const set<string>& directories, c
* @param archive
* @param file
*/
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive, streaming_archive::writer::File* file);
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive);

static void compute_and_add_empty_directories (const set<string>& directories, const set<string>& parent_directories,
const boost::filesystem::path& parent_path, streaming_archive::writer::Archive& archive)
Expand Down Expand Up @@ -69,12 +69,12 @@ static void compute_and_add_empty_directories (const set<string>& directories, c
archive.add_empty_directories(empty_directories);
}

static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive, streaming_archive::writer::File* file) {
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive) {
if (msg.has_ts_patt_changed()) {
archive.change_ts_pattern(*file, msg.get_ts_patt());
archive.change_ts_pattern(msg.get_ts_patt());
}

archive.write_msg(*file, msg.get_ts(), msg.get_content(), msg.get_orig_num_bytes());
archive.write_msg(msg.get_ts(), msg.get_content(), msg.get_orig_num_bytes());
}

namespace clp {
Expand Down Expand Up @@ -117,32 +117,32 @@ namespace clp {
m_parsed_message.clear();

// Open compressed file
auto* file = create_and_open_file(archive_writer, path_for_compression, group_id, m_uuid_generator(), 0);
archive_writer.create_and_open_file(path_for_compression, group_id, m_uuid_generator(), 0);

// Parse content from UTF-8 validation buffer
size_t buf_pos = 0;
while (m_message_parser.parse_next_message(false, m_utf8_validation_buf_length, m_utf8_validation_buf, buf_pos, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
} else if (file->get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
} else if (archive_writer.get_file().get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
}

write_message_to_encoded_file(m_parsed_message, archive_writer, file);
write_message_to_encoded_file(m_parsed_message, archive_writer);
}

// Parse remaining content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
} else if (file->get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
} else if (archive_writer.get_file().get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
}

write_message_to_encoded_file(m_parsed_message, archive_writer, file);
write_message_to_encoded_file(m_parsed_message, archive_writer);
}

close_file_and_mark_ready_for_segment(archive_writer, file);
close_file_and_append_to_segment(archive_writer);
}

bool FileCompressor::try_compressing_as_archive (size_t target_data_size_of_dicts, streaming_archive::writer::Archive::UserConfig& archive_user_config,
Expand Down
45 changes: 19 additions & 26 deletions components/core/src/clp/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,9 @@ namespace clp {
return all_paths_exist;
}

streaming_archive::writer::File* create_and_open_file (streaming_archive::writer::Archive& archive_writer, const string& path_for_compression,
group_id_t group_id, const boost::uuids::uuid& orig_file_id, size_t split_ix)
{
auto* file = archive_writer.create_file(path_for_compression, group_id, orig_file_id, split_ix);
archive_writer.open_file(*file);
return file;
}

void close_file_and_mark_ready_for_segment (streaming_archive::writer::Archive& archive_writer, streaming_archive::writer::File*& file) {
archive_writer.close_file(*file);
archive_writer.mark_file_ready_for_segment(file);
void close_file_and_append_to_segment (streaming_archive::writer::Archive& archive_writer) {
archive_writer.close_file();
archive_writer.append_file_to_segment();
}

void split_archive (streaming_archive::writer::Archive::UserConfig& archive_user_config, streaming_archive::writer::Archive& archive_writer) {
Expand All @@ -201,31 +193,32 @@ namespace clp {
}

void split_file (const string& path_for_compression, group_id_t group_id, const TimestampPattern* last_timestamp_pattern,
streaming_archive::writer::Archive& archive_writer, streaming_archive::writer::File*& file)
streaming_archive::writer::Archive& archive_writer)
{
auto orig_file_id = file->get_orig_file_id();
auto split_ix = file->get_split_ix();
file->set_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer, file);
const auto& encoded_file = archive_writer.get_file();
auto orig_file_id = encoded_file.get_orig_file_id();
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
auto split_ix = encoded_file.get_split_ix();
archive_writer.set_file_is_split(true);
close_file_and_append_to_segment(archive_writer);

file = create_and_open_file(archive_writer, path_for_compression, group_id, orig_file_id, ++split_ix);
archive_writer.create_and_open_file(path_for_compression, group_id, orig_file_id, ++split_ix);
// Initialize the file's timestamp pattern to the previous split's pattern
archive_writer.change_ts_pattern(*file, last_timestamp_pattern);
archive_writer.change_ts_pattern(last_timestamp_pattern);
}

void split_file_and_archive (streaming_archive::writer::Archive::UserConfig& archive_user_config, const string& path_for_compression, group_id_t group_id,
const TimestampPattern* last_timestamp_pattern, streaming_archive::writer::Archive& archive_writer,
streaming_archive::writer::File*& file)
const TimestampPattern* last_timestamp_pattern, streaming_archive::writer::Archive& archive_writer)
{
auto orig_file_id = file->get_orig_file_id();
auto split_ix = file->get_split_ix();
file->set_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer, file);
const auto& encoded_file = archive_writer.get_file();
auto orig_file_id = encoded_file.get_orig_file_id();
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
auto split_ix = encoded_file.get_split_ix();
archive_writer.set_file_is_split(true);
close_file_and_append_to_segment(archive_writer);

split_archive(archive_user_config, archive_writer);

file = create_and_open_file(archive_writer, path_for_compression, group_id, orig_file_id, ++split_ix);
archive_writer.create_and_open_file(path_for_compression, group_id, orig_file_id, ++split_ix);
// Initialize the file's timestamp pattern to the previous split's pattern
archive_writer.change_ts_pattern(*file, last_timestamp_pattern);
archive_writer.change_ts_pattern(last_timestamp_pattern);
}
}
Loading