Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Add option to record metadata about decompressed archive chunks. #485

Merged
merged 7 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
);
extraction_options.add(input_options);

po::options_description metadata_options("Metadata Options");
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
// clang-format off
metadata_options.add_options()(
"mongodb-uri",
po::value<std::string>(&m_mongodb_uri),
"The mongodb database used to record decompression metadata"
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
)(
"mongodb-collection",
po::value<std::string>(&m_mongodb_collection),
"The collection used to record decompression metadata"
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
);
// clang-format on
extraction_options.add(metadata_options);

po::options_description decompression_options("Decompression Options");
// clang-format off
decompression_options.add_options()(
Expand Down Expand Up @@ -333,6 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
visible_options.add(general_options);
visible_options.add(input_options);
visible_options.add(decompression_options);
visible_options.add(metadata_options);
std::cerr << visible_options << std::endl;
return ParsingResult::InfoCommand;
}
Expand All @@ -349,6 +364,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
throw std::invalid_argument("ordered-chunk-size must be used with ordered argument"
);
}

// We use xor to check that these arguments are either both specified or both
// unspecified.
if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief comment on why you use "xor"? like "either be both unspecified, or must be both specified"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add a comment

throw std::invalid_argument(
"mongodb-uri and mongodb-collection must both be non-empty"
);
}

if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) {
throw std::invalid_argument(
"recording decompression metadata only supported for ordered decompression"
gibber9809 marked this conversation as resolved.
Show resolved Hide resolved
);
}
} else if ((char)Command::Search == command_input) {
std::string archives_dir;
std::string query;
Expand Down
75 changes: 62 additions & 13 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
#include <system_error>

#include <fmt/core.h>
#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
#include <mongocxx/exception/exception.hpp>
#include <mongocxx/uri.hpp>

#include "archive_constants.hpp"
#include "ErrorCode.hpp"
#include "ReaderUtils.hpp"
#include "SchemaTree.hpp"
#include "TraceableException.hpp"

namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir),
m_ordered{option.ordered},
m_archive_id(option.archive_id),
m_ordered_chunk_size(option.ordered_chunk_size) {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option{option} {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand All @@ -32,8 +32,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
);
}

std::filesystem::path archive_path{m_archives_dir};
archive_path /= m_archive_id;
std::filesystem::path archive_path{m_option.archives_dir};
archive_path /= m_option.archive_id;
if (false == std::filesystem::is_directory(archive_path)) {
throw OperationFailed(
ErrorCodeFailure,
Expand All @@ -46,12 +46,12 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)

void JsonConstructor::store() {
m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->open(m_option.archives_dir, m_option.archive_id);
m_archive_reader->read_dictionaries_and_metadata();
if (false == m_ordered) {
if (false == m_option.ordered) {
FileWriter writer;
writer.open(
m_output_dir + "/original",
m_option.output_dir + "/original",
FileWriter::OpenMode::CreateIfNonexistentForAppending
);
m_archive_reader->store(writer);
Expand All @@ -78,10 +78,24 @@ void JsonConstructor::construct_in_order() {
epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_output_dir) / m_archive_id;
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);

mongocxx::client client;
mongocxx::collection collection;

if (m_option.metadata_db.has_value()) {
try {
auto const mongo_uri{mongocxx::uri(m_option.metadata_db->mongodb_uri)};
client = mongocxx::client{mongo_uri};
collection = client[mongo_uri.database()][m_option.metadata_db->mongodb_collection];
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeBadParamDbUri, __FILE__, __LINE__, e.what());
}
}

std::vector<bsoncxx::document::value> results;
auto finalize_chunk = [&](bool open_new_writer) {
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
Expand All @@ -93,6 +107,31 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}

if (m_option.metadata_db.has_value()) {
results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cPath,
new_file_path.filename()
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cOrigFileId,
m_option.archive_id
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cBeginMsgIx,
static_cast<int64_t>(first_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cEndMsgIx,
static_cast<int64_t>(last_timestamp)
),
bsoncxx::builder::basic::kvp(
constants::results_cache::decompression::cIsLastIrChunk,
false == open_new_writer
)
)));
}

if (open_new_writer) {
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);
}
Expand All @@ -112,7 +151,9 @@ void JsonConstructor::construct_in_order() {
writer.write(buffer.c_str(), buffer.length());
num_records_marshalled += 1;

if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) {
if (0 != m_option.ordered_chunk_size
&& num_records_marshalled >= m_option.ordered_chunk_size)
{
finalize_chunk(true);
num_records_marshalled = 0;
}
Expand All @@ -128,5 +169,13 @@ void JsonConstructor::construct_in_order() {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}
}

if (false == results.empty()) {
try {
collection.insert_many(results);
} catch (mongocxx::exception const& e) {
throw OperationFailed(ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__, e.what());
}
}
}
} // namespace clp_s
18 changes: 10 additions & 8 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_JSONCONSTRUCTOR_HPP
#define CLP_S_JSONCONSTRUCTOR_HPP

#include <optional>
#include <set>
#include <string>
#include <utility>
Expand All @@ -15,12 +16,18 @@
#include "TraceableException.hpp"

namespace clp_s {
struct MetadataDbOption {
std::string mongodb_uri;
std::string mongodb_collection;
};

struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
bool ordered;
size_t ordered_chunk_size;
bool ordered{false};
size_t ordered_chunk_size{0};
std::optional<MetadataDbOption> metadata_db;
};

class JsonConstructor {
Expand Down Expand Up @@ -59,12 +66,7 @@ class JsonConstructor {
*/
void construct_in_order();

std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;
bool m_ordered{false};
size_t m_ordered_chunk_size{0};

JsonConstructorOption m_option{};
std::unique_ptr<ArchiveReader> m_archive_reader;
};
} // namespace clp_s
Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp_s/archive_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict";
constexpr char cArchiveLogDictFile[] = "/log.dict";
constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict";
constexpr char cArchiveVarDictFile[] = "/var.dict";

namespace results_cache { namespace decompression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, and the way that would probably make sense if we did want to share this stuff between clo/clp-s is to just have a utility class that creates the required BSON objects and hides these field names. Right now we do take some classes from clp, but it's in a kind of nasty way where we have to manually add all of the required files to our build target. Really what we should do is have a build target that builds a library of utilities that all of our different binaries can use.

Since that's a bit of a bigger refactor I wanted to avoid including it in this PR. Plus the namespace styles used by clo are inconsistent with what we do here, and I want to avoid mixing styles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
constexpr char cPath[]{"path"};
constexpr char cOrigFileId[]{"orig_file_id"};
constexpr char cBeginMsgIx[]{"begin_msg_ix"};
constexpr char cEndMsgIx[]{"end_msg_ix"};
constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"};
}} // namespace results_cache::decompression

} // namespace clp_s::constants
#endif // CLP_S_ARCHIVE_CONSTANTS_HPP
10 changes: 7 additions & 3 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ int main(int argc, char const* argv[]) {
}

clp_s::TimestampPattern::init();
mongocxx::instance const mongocxx_instance{};

CommandLineArguments command_line_arguments("clp-s");
auto parsing_result = command_line_arguments.parse_arguments(argc, argv);
Expand All @@ -269,11 +270,16 @@ int main(int argc, char const* argv[]) {
return 1;
}

clp_s::JsonConstructorOption option;
clp_s::JsonConstructorOption option{};
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size();
if (false == command_line_arguments.get_mongodb_uri().empty()) {
option.metadata_db
= {command_line_arguments.get_mongodb_uri(),
command_line_arguments.get_mongodb_collection()};
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
}
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
Expand All @@ -295,8 +301,6 @@ int main(int argc, char const* argv[]) {
return 1;
}
} else {
mongocxx::instance const mongocxx_instance{};

auto const& query = command_line_arguments.get_query();
auto query_stream = std::istringstream(query);
auto expr = kql::parse_kql_expression(query_stream);
Expand Down
Loading