Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Add support for decompression in ascending timestamp order. #440

Merged
merged 8 commits into from
Jun 14, 2024
48 changes: 37 additions & 11 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,40 @@ SchemaReader& ArchiveReader::read_table(
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

auto& schema_reader
= create_schema_reader(schema_id, should_extract_timestamp, should_marshal_records);
initialize_schema_reader(
m_schema_reader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to rename it to initialize_schema_reader?

schema_id,
should_extract_timestamp,
should_marshal_records
);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader.load(m_tables_decompressor, m_id_to_table_metadata[schema_id].uncompressed_size);
m_schema_reader.load(
m_tables_decompressor,
m_id_to_table_metadata[schema_id].uncompressed_size
);
m_tables_decompressor.close_for_reuse();
return schema_reader;
return m_schema_reader;
}

std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

std::vector<std::shared_ptr<SchemaReader>> readers;
readers.reserve(m_id_to_table_metadata.size());
for (auto const& [id, table_metadata] : m_id_to_table_metadata) {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, id, true, true);

m_tables_file_reader.try_seek_from_begin(table_metadata.offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size);
m_tables_decompressor.close_for_reuse();

readers.push_back(std::move(schema_reader));
}
return readers;
}

BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int32_t column_id) {
Expand Down Expand Up @@ -200,13 +226,14 @@ void ArchiveReader::append_unordered_reader_columns(
}
}

SchemaReader& ArchiveReader::create_schema_reader(
void ArchiveReader::initialize_schema_reader(
SchemaReader& reader,
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
auto& schema = (*m_schema_map)[schema_id];
m_schema_reader.reset(
reader.reset(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
Expand All @@ -226,7 +253,7 @@ SchemaReader& ArchiveReader::create_schema_reader(
Schema::get_unordered_object_type(column_id)
);
append_unordered_reader_columns(
m_schema_reader,
reader,
mst_subtree_root_node_id,
sub_schema,
should_marshal_records
Expand All @@ -239,21 +266,20 @@ SchemaReader& ArchiveReader::create_schema_reader(
// column id is the root of the unordered object, so we can pass it directly to
// append_unordered_reader_columns.
append_unordered_reader_columns(
m_schema_reader,
reader,
column_id,
std::span<int32_t>(),
should_marshal_records
);
continue;
}
BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id);
BaseColumnReader* column_reader = append_reader_column(reader, column_id);

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
{
m_schema_reader.mark_column_as_timestamp(column_reader);
reader.mark_column_as_timestamp(column_reader);
}
}
return m_schema_reader;
}

void ArchiveReader::store(FileWriter& writer) {
Expand Down
13 changes: 10 additions & 3 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class ArchiveReader {
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

/**
* Loads all of the tables in the archive and returns SchemaReaders for them.
* @return the schema readers for every table in the archive
*/
std::vector<std::shared_ptr<SchemaReader>> read_all_tables();

std::string_view get_archive_id() { return m_archive_id; }

std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }
Expand Down Expand Up @@ -129,13 +135,14 @@ class ArchiveReader {

private:
/**
* Creates a schema reader for a given schema.
* Initializes a schema reader passed by reference to become a reader for a given schema.
* @param reader
* @param schema_id
* @param should_extract_timestamp
* @param should_marshal_records
* @return a reference to the newly created schema reader initialized with the given parameters
*/
SchemaReader& create_schema_reader(
void initialize_schema_reader(
SchemaReader& reader,
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
);
extraction_options.add(input_options);

po::options_description decompression_options("Decompression Options");
decompression_options.add_options()(
"ordered",
po::bool_switch(&m_ordered_decompression),
"Enable decompression in ascending timestamp order for this archive"
);
extraction_options.add(decompression_options);

po::positional_options_description positional_options;
positional_options.add("archives-dir", 1);
positional_options.add("output-dir", 1);
Expand Down Expand Up @@ -316,6 +324,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
po::options_description visible_options;
visible_options.add(general_options);
visible_options.add(input_options);
visible_options.add(decompression_options);
std::cerr << visible_options << std::endl;
return ParsingResult::InfoCommand;
}
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class CommandLineArguments {

bool get_structurize_arrays() const { return m_structurize_arrays; }

bool get_ordered_decompression() const { return m_ordered_decompression; }

private:
// Methods
/**
Expand Down Expand Up @@ -167,6 +169,7 @@ class CommandLineArguments {
bool m_print_archive_stats{false};
size_t m_max_document_size{512ULL * 1024 * 1024}; // 512 MB
bool m_structurize_arrays{false};
bool m_ordered_decompression{false};

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
Expand Down
31 changes: 30 additions & 1 deletion components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "JsonConstructor.hpp"

#include <filesystem>
#include <queue>
#include <system_error>

#include <fmt/core.h>
Expand All @@ -14,6 +15,7 @@ namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir),
m_ordered{option.ordered},
m_archive_id(option.archive_id) {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
Expand Down Expand Up @@ -43,15 +45,42 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)

void JsonConstructor::store() {
FileWriter writer;
// TODO: change this when doing chunking
writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending);

m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->read_dictionaries_and_metadata();
m_archive_reader->store(writer);
if (false == m_ordered) {
m_archive_reader->store(writer);
} else {
construct_in_order(writer);
}
m_archive_reader->close();

writer.close();
}

void JsonConstructor::construct_in_order(FileWriter& writer) {
std::string buffer;
auto tables = m_archive_reader->read_all_tables();
using ReaderPointer = std::shared_ptr<SchemaReader>;
auto cmp = [](ReaderPointer& left, ReaderPointer& right) {
return left->get_next_timestamp() > right->get_next_timestamp();
};
std::priority_queue record_queue(tables.begin(), tables.end(), cmp);
// Clear tables vector so that memory gets deallocated after we have marshalled all records for
// a given table
tables.clear();
while (false == record_queue.empty()) {
ReaderPointer next = record_queue.top();
record_queue.pop();
next->get_next_message(buffer);
if (false == next->done()) {
record_queue.emplace(std::move(next));
}
writer.write(buffer.c_str(), buffer.length());
}
writer.close();
}
} // namespace clp_s
10 changes: 10 additions & 0 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "ColumnReader.hpp"
#include "DictionaryReader.hpp"
#include "ErrorCode.hpp"
#include "FileWriter.hpp"
#include "SchemaReader.hpp"
#include "SchemaTree.hpp"
#include "TraceableException.hpp"
Expand All @@ -18,6 +19,7 @@ struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
bool ordered;
};

class JsonConstructor {
Expand Down Expand Up @@ -50,9 +52,17 @@ class JsonConstructor {
void store();

private:
/**
* Reads all of the tables from m_archive_reader and writes all of the records
* they contain to writer in timestamp order.
* @param writer
*/
void construct_in_order(FileWriter& writer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a description for this method?


std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;
bool m_ordered{false};

std::unique_ptr<ArchiveReader> m_archive_reader;
};
Expand Down
10 changes: 10 additions & 0 deletions components/core/src/clp_s/SchemaReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ class SchemaReader {
*/
static int32_t get_first_column_in_span(std::span<int32_t> schema);

/**
* @return the timestamp found in the row pointed to by m_cur_message
*/
epochtime_t get_next_timestamp() const { return m_get_timestamp(); }

/**
* @return true if all records in this table have been iterated over, false otherwise
*/
bool done() const { return m_cur_message >= m_num_messages; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to add a description for this method?


private:
/**
* Merges the current local schema tree with the section of the global schema tree corresponding
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ int main(int argc, char const* argv[]) {

clp_s::JsonConstructorOption option;
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
try {
auto const& archive_id = command_line_arguments.get_archive_id();
Expand Down
Loading