Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ArrayBackedSet to replace std::set for index in segment #47

Merged
merged 41 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
fa435ab
1.Remove uncommited entries from memory during compression. 2.Updated…
haiqi96 Nov 26, 2021
c34dcfa
Updated unit-test
haiqi96 Nov 26, 2021
667251e
Fix a mistake in the test and added a new variable string that will b…
haiqi96 Nov 26, 2021
6a74d3c
address some issues that are easier to fix in the pull request review
haiqi96 Nov 27, 2021
611c992
fix a variable name mismatch that was not caught by compiler and remo…
haiqi96 Nov 27, 2021
5dd2954
remove unused arguments from function
haiqi96 Nov 27, 2021
5f12a10
replace dynamically created entry with static ones. updated the open_…
haiqi96 Nov 27, 2021
2d4cbf1
better function refactor
haiqi96 Nov 27, 2021
c21ac31
use unique_ptr for dynamically allocated set. replace for loop with d…
haiqi96 Nov 27, 2021
366a463
address some issues in code review
haiqi96 Nov 28, 2021
9f97f79
Move LogEntry off from stack to avoid unnecessary allocation and dest…
haiqi96 Nov 28, 2021
07808cd
renamed add_occurrence function and slightly modified how next_id is …
haiqi96 Nov 28, 2021
17d256b
Fixed a few small places for cleaner code
haiqi96 Nov 28, 2021
0c6b202
Merge branch 'y-scope:main' into main
haiqi96 Dec 3, 2021
a96ff48
Merge branch 'y-scope:main' into main
haiqi96 Dec 29, 2021
66b9a6a
Merge branch 'y-scope:main' into main
haiqi96 Jan 11, 2022
1ba3d32
port changes over
haiqi96 Jan 11, 2022
e2d65ba
reorganize as template
haiqi96 Jan 11, 2022
60d0d28
add specific function for write to compressor
haiqi96 Jan 11, 2022
52abda1
rename the class, further cleaned up unused functions
haiqi96 Jan 11, 2022
7d6fc76
revert unintended changes
haiqi96 Jan 11, 2022
ac21336
revert unintended changes
haiqi96 Jan 11, 2022
6d10463
add CMakefile
haiqi96 Jan 11, 2022
50a8373
backip
haiqi96 Jan 13, 2022
e288963
fix
haiqi96 Jan 14, 2022
6969c3f
combine open and create file, and some other clean up
haiqi96 Jan 14, 2022
ae7322b
rename one member variable and add more comments to the newly created…
haiqi96 Jan 15, 2022
25a3292
further clean up comment
haiqi96 Jan 15, 2022
e3d7a49
rename the new data sturcture, update function description in the hea…
haiqi96 Jan 17, 2022
80963a4
adjust the order of header inclusion and constant define. removed one…
haiqi96 Jan 17, 2022
ed3c85c
simplified unnecessarily complicated sorting logic
haiqi96 Jan 17, 2022
50a693f
fix
haiqi96 Jan 17, 2022
d8da846
fix various comment issue
haiqi96 Jan 20, 2022
b148bee
Add more fixes
haiqi96 Jan 20, 2022
e73d4b2
fix nit and rename some functions
haiqi96 Jan 20, 2022
7baf5bd
fix issues in the comment
haiqi96 Jan 22, 2022
4887b72
more fixes
haiqi96 Jan 23, 2022
a44e11b
revert one const auto change
haiqi96 Jan 23, 2022
e631739
fix nit
haiqi96 Jan 23, 2022
b6b7157
add nullptr check and fix captitalization
haiqi96 Jan 23, 2022
fa99509
capitalization
haiqi96 Jan 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ set(SOURCE_FILES_clp
submodules/json/single_include/nlohmann/json.hpp
submodules/sqlite3/sqlite3.c
submodules/sqlite3/sqlite3.h
src/IDOccurrenceArray.cpp
src/IDOccurrenceArray.hpp
)
add_executable(clp ${SOURCE_FILES_clp})
target_link_libraries(clp
Expand Down
11 changes: 5 additions & 6 deletions components/core/src/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "FileWriter.hpp"
#include "streaming_compression/zstd/Compressor.hpp"
#include "TraceableException.hpp"
#include "IDOccurrenceArray.hpp"

/**
* Template class for performing operations on dictionaries and writing them to disk
Expand Down Expand Up @@ -72,7 +73,7 @@ class DictionaryWriter {
* @param segment_id
* @param ids
*/
void index_segment (segment_id_t segment_id, const std::unordered_set<DictionaryIdType>& ids);
void index_segment (segment_id_t segment_id, const IDOccurrenceArray<DictionaryIdType>& ids);

/**
* Gets the size of the dictionary when it is stored on disk
Expand Down Expand Up @@ -227,18 +228,16 @@ void DictionaryWriter<DictionaryIdType, EntryType>::open_and_preload (const std:
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment (segment_id_t segment_id, const std::unordered_set<DictionaryIdType>& ids) {
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment (segment_id_t segment_id, const IDOccurrenceArray<DictionaryIdType>& ids) {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

m_segment_index_compressor.write_numeric_value(segment_id);

// NOTE: The IDs in `ids` are not validated to exist in this dictionary since we perform validation when loading the dictionary.
m_segment_index_compressor.write_numeric_value<uint64_t>(ids.size());
for (auto id : ids) {
m_segment_index_compressor.write_numeric_value(id);
}
m_segment_index_compressor.write_numeric_value<uint64_t>(ids.num_ids());
ids.write_to_compressor(m_segment_index_compressor);

++m_num_segments_in_index;

Expand Down
1 change: 1 addition & 0 deletions components/core/src/IDOccurrenceArray.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "IDOccurrenceArray.hpp"
203 changes: 203 additions & 0 deletions components/core/src/IDOccurrenceArray.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
// Created by haiqixu on 1/6/2022.
//

#ifndef IDOCCURRENCEARRAY_HPP
#define IDOCCURRENCEARRAY_HPP

// C++ standard libraries
#include <vector>
#include <unordered_set>

// spdlog
#include <spdlog/spdlog.h>

// Project headers
#include "Defs.h"
#include "streaming_compression/zstd/Compressor.hpp"
#include "TraceableException.hpp"

// Constant
#define MAX_CAPACITY 2147483648
#define DEFAULT_CAPACITY 1024

template<typename DictionaryIdType>
class IDOccurrenceArray {
public:

class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed (ErrorCode error_code, const char* const filename, int line_number) : TraceableException(error_code, filename, line_number) {}

// Methods
const char* what () const

noexcept override{
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return "IDOccurrenceArray operation failed";
}
};

// Constructors
IDOccurrenceArray ();

explicit IDOccurrenceArray (size_t initial_capacity);

// Destructors
~IDOccurrenceArray () = default;

// Methods
/**
* Inserts the id into the data occurrence array
* @param id
*/
void insert_id (DictionaryIdType id);

/**
* Gets the number of different ids in the array
* @return number of ids in the array
*/
size_t num_ids () const { return m_num_ids; }

/**
* Clears the id occurrence info and reset
* the array back to default initial capacity
*/
void reset ();

/**
* Inserts all IDs from another IDOccurrenceArray
* into the caller IDOccurrenceArray object
* @param ids_occurrence_array
*/
void insert_id_from_array (const IDOccurrenceArray<DictionaryIdType>& ids_occurrence_array);

/**
* Inserts all IDs from an unordered_set
* into the caller IDOccurrenceArray object
* @param ids_set
*/
void insert_id_from_set (const std::unordered_set <DictionaryIdType>& ids_set);

/**
* Writes all IDs in the array into the segment index compressor
* @param segment_index_compressor
*/
void write_to_compressor (streaming_compression::zstd::Compressor& segment_index_compressor) const;

private:

std::vector<bool> m_data;
size_t m_initial_capacity;

// Tracks number of IDs that occurred in the array
size_t m_num_ids;

// Tracks the largest ID in the array
size_t m_largest_id;

/**
* increase the capacity of the bool array so that
* input id becomes an valid index of the array.
* @throw IDOccurrenceArray::OperationFailed if the input id is already within the bool array's index range
* @param id
*/
void increase_capacity (size_t id);
};

template<typename DictionaryIdType>
IDOccurrenceArray<DictionaryIdType>::IDOccurrenceArray () {
m_initial_capacity = DEFAULT_CAPACITY;
reset();
}

template<typename DictionaryIdType>
IDOccurrenceArray<DictionaryIdType>::IDOccurrenceArray (size_t initial_capacity) {
m_initial_capacity = initial_capacity;
reset();
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::reset () {
m_data.clear();
m_data.resize(m_initial_capacity, false);
m_num_ids = 0;
m_largest_id = 0;
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::insert_id (DictionaryIdType id) {

if (id >= m_data.size()) {
increase_capacity(id);
}
// if the id is not already marked as "occurred"
if (!m_data[id]) {
m_data[id] = true;
m_num_ids++;
// update the largest id if necessary
if (id > m_largest_id) {
m_largest_id = id;
}
}
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::increase_capacity (size_t id) {
if (id < m_data.size()) {
SPDLOG_ERROR("Calling increase_capacity on IDs smaller than capacity.");
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}
size_t capacity = m_data.size();
do {
capacity = capacity * 2;
} while (capacity <= id);

if (capacity > MAX_CAPACITY) {
SPDLOG_WARN("size of array {} is more than {} bytes.", capacity, MAX_CAPACITY);
}
m_data.resize(capacity, false);
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::insert_id_from_array (const IDOccurrenceArray<DictionaryIdType>& ids_occurrence_array) {

auto input_data_array = ids_occurrence_array.m_data;
size_t input_max_id = ids_occurrence_array.m_largest_id;
// increase size of the boolean array if needed
if (input_max_id >= m_data.size()) {
increase_capacity(input_max_id);
}
for (auto id = 0; id <= input_max_id; id++) {
// if an ID
// 1. doesn't occur in the caller array
// 2. occurs in the input array
// Adds it to the caller's array
if (!m_data[id] && input_data_array[id]) {
m_data[id] = true;
m_num_ids++;
}
}
if (input_max_id > m_largest_id) {
m_largest_id = input_max_id;
}
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::insert_id_from_set (const std::unordered_set <DictionaryIdType>& ids_set) {
for (const DictionaryIdType id: ids_set) {
insert_id(id);
}
}

template<typename DictionaryIdType>
void IDOccurrenceArray<DictionaryIdType>::write_to_compressor (streaming_compression::zstd::Compressor& segment_index_compressor) const {

for (size_t id = 0; id <= m_largest_id; id++) {
if (m_data[id]) {
segment_index_compressor.write_numeric_value((DictionaryIdType) id);
}
}
}

#endif //IDOCCURRENCEARRAY_HPP
28 changes: 14 additions & 14 deletions components/core/src/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static void compute_and_add_empty_directories (const set<string>& directories, c
* @param archive
* @param file
*/
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive, streaming_archive::writer::File* file);
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive);

static void compute_and_add_empty_directories (const set<string>& directories, const set<string>& parent_directories,
const boost::filesystem::path& parent_path, streaming_archive::writer::Archive& archive)
Expand Down Expand Up @@ -69,12 +69,12 @@ static void compute_and_add_empty_directories (const set<string>& directories, c
archive.add_empty_directories(empty_directories);
}

static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive, streaming_archive::writer::File* file) {
static void write_message_to_encoded_file (const ParsedMessage& msg, streaming_archive::writer::Archive& archive) {
if (msg.has_ts_patt_changed()) {
archive.change_ts_pattern(*file, msg.get_ts_patt());
archive.change_ts_pattern(msg.get_ts_patt());
}

archive.write_msg(*file, msg.get_ts(), msg.get_content(), msg.get_orig_num_bytes());
archive.write_msg(msg.get_ts(), msg.get_content(), msg.get_orig_num_bytes());
}

namespace clp {
Expand Down Expand Up @@ -117,32 +117,32 @@ namespace clp {
m_parsed_message.clear();

// Open compressed file
auto* file = create_and_open_file(archive_writer, path_for_compression, group_id, m_uuid_generator(), 0);
archive_writer.create_and_open_file(path_for_compression, group_id, m_uuid_generator(), 0);

// Parse content from UTF-8 validation buffer
size_t buf_pos = 0;
while (m_message_parser.parse_next_message(false, m_utf8_validation_buf_length, m_utf8_validation_buf, buf_pos, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
} else if (file->get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
} else if (archive_writer.get_encoded_file_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
}

write_message_to_encoded_file(m_parsed_message, archive_writer, file);
write_message_to_encoded_file(m_parsed_message, archive_writer);
}

// Parse remaining content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
} else if (file->get_encoded_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer, file);
split_file_and_archive(archive_user_config, path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
} else if (archive_writer.get_encoded_file_size_in_bytes() >= target_encoded_file_size) {
split_file(path_for_compression, group_id, m_parsed_message.get_ts_patt(), archive_writer);
}

write_message_to_encoded_file(m_parsed_message, archive_writer, file);
write_message_to_encoded_file(m_parsed_message, archive_writer);
}

close_file_and_mark_ready_for_segment(archive_writer, file);
close_file_and_mark_ready_for_segment(archive_writer);
}

bool FileCompressor::try_compressing_as_archive (size_t target_data_size_of_dicts, streaming_archive::writer::Archive::UserConfig& archive_user_config,
Expand Down
43 changes: 17 additions & 26 deletions components/core/src/clp/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,9 @@ namespace clp {
return all_paths_exist;
}

streaming_archive::writer::File* create_and_open_file (streaming_archive::writer::Archive& archive_writer, const string& path_for_compression,
group_id_t group_id, const boost::uuids::uuid& orig_file_id, size_t split_ix)
{
auto* file = archive_writer.create_file(path_for_compression, group_id, orig_file_id, split_ix);
archive_writer.open_file(*file);
return file;
}

void close_file_and_mark_ready_for_segment (streaming_archive::writer::Archive& archive_writer, streaming_archive::writer::File*& file) {
archive_writer.close_file(*file);
archive_writer.mark_file_ready_for_segment(file);
void close_file_and_mark_ready_for_segment (streaming_archive::writer::Archive& archive_writer) {
archive_writer.close_file();
archive_writer.mark_file_ready_for_segment();
}

void split_archive (streaming_archive::writer::Archive::UserConfig& archive_user_config, streaming_archive::writer::Archive& archive_writer) {
Expand All @@ -201,31 +193,30 @@ namespace clp {
}

void split_file (const string& path_for_compression, group_id_t group_id, const TimestampPattern* last_timestamp_pattern,
streaming_archive::writer::Archive& archive_writer, streaming_archive::writer::File*& file)
streaming_archive::writer::Archive& archive_writer)
{
auto orig_file_id = file->get_orig_file_id();
auto split_ix = file->get_split_ix();
file->set_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer, file);
auto orig_file_id = archive_writer.get_orig_file_id();
auto split_ix = archive_writer.get_file_split_ix();
archive_writer.set_file_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer);

file = create_and_open_file(archive_writer, path_for_compression, group_id, orig_file_id, ++split_ix);
archive_writer.create_and_open_file(path_for_compression, group_id, orig_file_id, ++split_ix);
// Initialize the file's timestamp pattern to the previous split's pattern
archive_writer.change_ts_pattern(*file, last_timestamp_pattern);
archive_writer.change_ts_pattern(last_timestamp_pattern);
}

void split_file_and_archive (streaming_archive::writer::Archive::UserConfig& archive_user_config, const string& path_for_compression, group_id_t group_id,
const TimestampPattern* last_timestamp_pattern, streaming_archive::writer::Archive& archive_writer,
streaming_archive::writer::File*& file)
const TimestampPattern* last_timestamp_pattern, streaming_archive::writer::Archive& archive_writer)
{
auto orig_file_id = file->get_orig_file_id();
auto split_ix = file->get_split_ix();
file->set_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer, file);
auto orig_file_id = archive_writer.get_orig_file_id();
auto split_ix = archive_writer.get_file_split_ix();
archive_writer.set_file_is_split(true);
close_file_and_mark_ready_for_segment(archive_writer);

split_archive(archive_user_config, archive_writer);

file = create_and_open_file(archive_writer, path_for_compression, group_id, orig_file_id, ++split_ix);
archive_writer.create_and_open_file(path_for_compression, group_id, orig_file_id, ++split_ix);
// Initialize the file's timestamp pattern to the previous split's pattern
archive_writer.change_ts_pattern(*file, last_timestamp_pattern);
archive_writer.change_ts_pattern(last_timestamp_pattern);
}
}
Loading