From b665de2db21e4b3022c246c26399feb17fd5977a Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Thu, 18 Jul 2024 16:38:35 +0000 Subject: [PATCH 1/7] Record decompression metadata to mongodb during ordered decompression --- .../core/src/clp_s/CommandLineArguments.cpp | 26 +++++++ components/core/src/clp_s/JsonConstructor.cpp | 75 +++++++++++++++---- components/core/src/clp_s/JsonConstructor.hpp | 18 +++-- .../core/src/clp_s/archive_constants.hpp | 9 +++ components/core/src/clp_s/clp-s.cpp | 7 +- 5 files changed, 113 insertions(+), 22 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 77e896160..d016b5237 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -286,6 +286,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); extraction_options.add(input_options); + po::options_description metadata_options; + // clang-format off + metadata_options.add_options()( + "mongodb-uri", + po::value(&m_mongodb_uri), + "The mongodb database used to record decompression metadata" + )( + "mongodb-collection", + po::value(&m_mongodb_collection), + "The collection used to record decompression metadata" + ); + // clang-format on + extraction_options.add(metadata_options); + po::options_description decompression_options("Decompression Options"); // clang-format off decompression_options.add_options()( @@ -349,6 +363,18 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("ordered-chunk-size must be used with ordered argument" ); } + + if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) { + throw std::invalid_argument( + "mongodb-uri and mongodb-collection must both be non-empty" + ); + } + + if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) { + throw std::invalid_argument( + "recording decompression metadata only supported for ordered decompression" + ); + } } else if ((char)Command::Search == command_input) { std::string archives_dir; std::string query; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 68151a1a7..90887f1e5 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -5,19 +5,19 @@ #include #include +#include +#include +#include +#include +#include "archive_constants.hpp" #include "ErrorCode.hpp" #include "ReaderUtils.hpp" #include "SchemaTree.hpp" #include "TraceableException.hpp" namespace clp_s { -JsonConstructor::JsonConstructor(JsonConstructorOption const& option) - : m_output_dir(option.output_dir), - m_archives_dir(option.archives_dir), - m_ordered{option.ordered}, - m_archive_id(option.archive_id), - m_ordered_chunk_size(option.ordered_chunk_size) { +JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option{option} { std::error_code error_code; if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) { throw OperationFailed( @@ -32,8 +32,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) ); } - std::filesystem::path archive_path{m_archives_dir}; - archive_path /= m_archive_id; + std::filesystem::path archive_path{m_option.archives_dir}; + archive_path /= m_option.archive_id; if (false == std::filesystem::is_directory(archive_path)) { throw OperationFailed( ErrorCodeFailure, @@ -46,12 +46,12 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) void JsonConstructor::store() { m_archive_reader = std::make_unique(); - m_archive_reader->open(m_archives_dir, m_archive_id); + m_archive_reader->open(m_option.archives_dir, m_option.archive_id); m_archive_reader->read_dictionaries_and_metadata(); - if (false == m_ordered) { + if (false == m_option.ordered) { FileWriter writer; writer.open( - m_output_dir + "/original", + m_option.output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending ); m_archive_reader->store(writer); @@ -78,10 +78,24 @@ void JsonConstructor::construct_in_order() { epochtime_t first_timestamp{0}; epochtime_t last_timestamp{0}; size_t num_records_marshalled{0}; - auto src_path = std::filesystem::path(m_output_dir) / m_archive_id; + auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; FileWriter writer; writer.open(src_path, FileWriter::OpenMode::CreateForWriting); + mongocxx::client client; + mongocxx::collection collection; + + if (m_option.metadata_db.has_value()) { + try { + auto const mongo_uri{mongocxx::uri(m_option.metadata_db->mongodb_uri)}; + client = mongocxx::client{mongo_uri}; + collection = client[mongo_uri.database()][m_option.metadata_db->mongodb_collection]; + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCodeBadParamDbUri, __FILE__, __LINE__, e.what()); + } + } + + std::vector results; auto finalize_chunk = [&](bool open_new_writer) { writer.close(); std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_" @@ -93,6 +107,31 @@ void JsonConstructor::construct_in_order() { throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); } + if (m_option.metadata_db.has_value()) { + results.emplace_back(std::move(bsoncxx::builder::basic::make_document( + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cPath, + new_file_path.filename() + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cOrigFileId, + m_option.archive_id + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cBeginMsgIx, + static_cast(first_timestamp) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cEndMsgIx, + static_cast(last_timestamp) + ), + bsoncxx::builder::basic::kvp( + constants::results_cache::decompression::cIsLastIrChunk, + false == open_new_writer + ) + ))); + } + if (open_new_writer) { writer.open(src_path, FileWriter::OpenMode::CreateForWriting); } @@ -112,7 +151,9 @@ void JsonConstructor::construct_in_order() { writer.write(buffer.c_str(), buffer.length()); num_records_marshalled += 1; - if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) { + if (0 != m_option.ordered_chunk_size + && num_records_marshalled >= m_option.ordered_chunk_size) + { finalize_chunk(true); num_records_marshalled = 0; } @@ -128,5 +169,13 @@ void JsonConstructor::construct_in_order() { throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message()); } } + + if (false == results.empty()) { + try { + collection.insert_many(results); + } catch (mongocxx::exception const& e) { + throw OperationFailed(ErrorCodeFailureDbBulkWrite, __FILE__, __LINE__, e.what()); + } + } } } // namespace clp_s diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 22a2daf59..5b008a0a3 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_JSONCONSTRUCTOR_HPP #define CLP_S_JSONCONSTRUCTOR_HPP +#include #include #include #include @@ -15,12 +16,18 @@ #include "TraceableException.hpp" namespace clp_s { +struct MetadataDbOption { + std::string mongodb_uri; + std::string mongodb_collection; +}; + struct JsonConstructorOption { std::string archives_dir; std::string archive_id; std::string output_dir; - bool ordered; - size_t ordered_chunk_size; + bool ordered{false}; + size_t ordered_chunk_size{0}; + std::optional metadata_db; }; class JsonConstructor { @@ -59,12 +66,7 @@ class JsonConstructor { */ void construct_in_order(); - std::string m_archives_dir; - std::string m_archive_id; - std::string m_output_dir; - bool m_ordered{false}; - size_t m_ordered_chunk_size{0}; - + JsonConstructorOption m_option{}; std::unique_ptr m_archive_reader; }; } // namespace clp_s diff --git a/components/core/src/clp_s/archive_constants.hpp b/components/core/src/clp_s/archive_constants.hpp index d5a89d0bf..d16b05da3 100644 --- a/components/core/src/clp_s/archive_constants.hpp +++ b/components/core/src/clp_s/archive_constants.hpp @@ -15,5 +15,14 @@ constexpr char cArchiveArrayDictFile[] = "/array.dict"; constexpr char cArchiveLogDictFile[] = "/log.dict"; constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict"; constexpr char cArchiveVarDictFile[] = "/var.dict"; + +namespace results_cache { namespace decompression { +constexpr char cPath[]{"path"}; +constexpr char cOrigFileId[]{"orig_file_id"}; +constexpr char cBeginMsgIx[]{"begin_msg_ix"}; +constexpr char cEndMsgIx[]{"end_msg_ix"}; +constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"}; +}} // namespace results_cache::decompression + } // namespace clp_s::constants #endif // CLP_S_ARCHIVE_CONSTANTS_HPP diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index d01ed0fe0..a99c5dd34 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -269,11 +269,16 @@ int main(int argc, char const* argv[]) { return 1; } - clp_s::JsonConstructorOption option; + clp_s::JsonConstructorOption option{}; option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); option.archives_dir = archives_dir; option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size(); + if (false == command_line_arguments.get_mongodb_uri().empty()) { + option.metadata_db + = {command_line_arguments.get_mongodb_uri(), + command_line_arguments.get_mongodb_collection()}; + } try { auto const& archive_id = command_line_arguments.get_archive_id(); if (false == archive_id.empty()) { From e3a899b00019f355486dce451ac642a2e583eddd Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Thu, 18 Jul 2024 16:42:35 +0000 Subject: [PATCH 2/7] Make metadata output options visible --- components/core/src/clp_s/CommandLineArguments.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index d016b5237..fdbd03598 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -286,7 +286,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); extraction_options.add(input_options); - po::options_description metadata_options; + po::options_description metadata_options("Metadata Options"); // clang-format off metadata_options.add_options()( "mongodb-uri", @@ -347,6 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { visible_options.add(general_options); visible_options.add(input_options); visible_options.add(decompression_options); + visible_options.add(metadata_options); std::cerr << visible_options << std::endl; return ParsingResult::InfoCommand; } From a9e320cf52f9532b4f04f96a3d03a042dca00ca4 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Mon, 22 Jul 2024 19:53:32 +0000 Subject: [PATCH 3/7] Add comment --- components/core/src/clp_s/CommandLineArguments.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index fdbd03598..e19a77daa 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -365,6 +365,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); } + // We use xor to check that these arguments are either both specified or both + // unspecified. if (m_mongodb_uri.empty() ^ m_mongodb_collection.empty()) { throw std::invalid_argument( "mongodb-uri and mongodb-collection must both be non-empty" From f6f9f10fc09c46d57069ff95fab24f0eec73fc8c Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Mon, 22 Jul 2024 19:54:01 +0000 Subject: [PATCH 4/7] Fix bug where mongocxx::instance not instantiated before needed in decompression code path --- components/core/src/clp_s/clp-s.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index a99c5dd34..0e0401ad1 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -245,6 +245,7 @@ int main(int argc, char const* argv[]) { } clp_s::TimestampPattern::init(); + mongocxx::instance const mongocxx_instance{}; CommandLineArguments command_line_arguments("clp-s"); auto parsing_result = command_line_arguments.parse_arguments(argc, argv); @@ -300,8 +301,6 @@ int main(int argc, char const* argv[]) { return 1; } } else { - mongocxx::instance const mongocxx_instance{}; - auto const& query = command_line_arguments.get_query(); auto query_stream = std::istringstream(query); auto expr = kql::parse_kql_expression(query_stream); From b413080978ba227317eceb755a9cd17e4dbcf8af Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Tue, 23 Jul 2024 12:57:43 -0400 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- components/core/src/clp_s/CommandLineArguments.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index e19a77daa..d05225fae 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -290,12 +290,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format off metadata_options.add_options()( "mongodb-uri", - po::value(&m_mongodb_uri), - "The mongodb database used to record decompression metadata" + po::value(&m_mongodb_uri)->value_name("URI"), + "MongoDB URI for the database to write decompression metadata to" )( "mongodb-collection", - po::value(&m_mongodb_collection), - "The collection used to record decompression metadata" + po::value(&m_mongodb_collection)->value_name("COLLECTION"), + "MongoDB collection to write decompression metadata to" ); // clang-format on extraction_options.add(metadata_options); @@ -375,7 +375,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { if (false == m_mongodb_uri.empty() && false == m_ordered_decompression) { throw std::invalid_argument( - "recording decompression metadata only supported for ordered decompression" + "Recording decompression metadata only supported for ordered decompression" ); } } else if ((char)Command::Search == command_input) { From a643c4a621ac18be7b83d1397b798b7b6107cb85 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 23 Jul 2024 17:06:17 +0000 Subject: [PATCH 6/7] Address review comments --- .../core/src/clp_s/CommandLineArguments.cpp | 30 +++++++++---------- components/core/src/clp_s/JsonConstructor.hpp | 4 +++ .../core/src/clp_s/archive_constants.hpp | 4 +-- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index d05225fae..4cfe017ac 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -286,20 +286,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); extraction_options.add(input_options); - po::options_description metadata_options("Metadata Options"); - // clang-format off - metadata_options.add_options()( - "mongodb-uri", - po::value(&m_mongodb_uri)->value_name("URI"), - "MongoDB URI for the database to write decompression metadata to" - )( - "mongodb-collection", - po::value(&m_mongodb_collection)->value_name("COLLECTION"), - "MongoDB collection to write decompression metadata to" - ); - // clang-format on - extraction_options.add(metadata_options); - po::options_description decompression_options("Decompression Options"); // clang-format off decompression_options.add_options()( @@ -316,6 +302,20 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { // clang-format on extraction_options.add(decompression_options); + po::options_description output_metadata_options("Output Metadata Options"); + // clang-format off + output_metadata_options.add_options()( + "mongodb-uri", + po::value(&m_mongodb_uri)->value_name("URI"), + "MongoDB URI for the database to write decompression metadata to" + )( + "mongodb-collection", + po::value(&m_mongodb_collection)->value_name("COLLECTION"), + "MongoDB collection to write decompression metadata to" + ); + // clang-format on + extraction_options.add(output_metadata_options); + po::positional_options_description positional_options; positional_options.add("archives-dir", 1); positional_options.add("output-dir", 1); @@ -347,7 +347,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { visible_options.add(general_options); visible_options.add(input_options); visible_options.add(decompression_options); - visible_options.add(metadata_options); + visible_options.add(output_metadata_options); std::cerr << visible_options << std::endl; return ParsingResult::InfoCommand; } diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 5b008a0a3..dd172d2a5 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -17,6 +17,10 @@ namespace clp_s { struct MetadataDbOption { + MetadataDbOption(std::string const& uri, std::string const& collection) + : mongodb_uri(uri), + mongodb_collection(collection) {} + std::string mongodb_uri; std::string mongodb_collection; }; diff --git a/components/core/src/clp_s/archive_constants.hpp b/components/core/src/clp_s/archive_constants.hpp index d16b05da3..30e2b78d5 100644 --- a/components/core/src/clp_s/archive_constants.hpp +++ b/components/core/src/clp_s/archive_constants.hpp @@ -16,13 +16,13 @@ constexpr char cArchiveLogDictFile[] = "/log.dict"; constexpr char cArchiveTimestampDictFile[] = "/timestamp.dict"; constexpr char cArchiveVarDictFile[] = "/var.dict"; -namespace results_cache { namespace decompression { +namespace results_cache::decompression { constexpr char cPath[]{"path"}; constexpr char cOrigFileId[]{"orig_file_id"}; constexpr char cBeginMsgIx[]{"begin_msg_ix"}; constexpr char cEndMsgIx[]{"end_msg_ix"}; constexpr char cIsLastIrChunk[]{"is_last_ir_chunk"}; -}} // namespace results_cache::decompression +} // namespace results_cache::decompression } // namespace clp_s::constants #endif // CLP_S_ARCHIVE_CONSTANTS_HPP From f916270afe040b7e38b91783b8d7bc22145a8f97 Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Tue, 23 Jul 2024 14:53:34 -0400 Subject: [PATCH 7/7] Update components/core/src/clp_s/JsonConstructor.hpp Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- components/core/src/clp_s/JsonConstructor.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index dd172d2a5..f1f71f9d8 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -18,8 +18,8 @@ namespace clp_s { struct MetadataDbOption { MetadataDbOption(std::string const& uri, std::string const& collection) - : mongodb_uri(uri), - mongodb_collection(collection) {} + : mongodb_uri{uri}, + mongodb_collection{collection} {} std::string mongodb_uri; std::string mongodb_collection;