From eedbd2045e19d6683c00dd39ded22be314f6de31 Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Thu, 13 Jun 2024 23:06:22 -0400 Subject: [PATCH] clp-s: Add archive_id to search results. (#435) --- components/core/src/clp_s/ArchiveReader.cpp | 28 +++++--- components/core/src/clp_s/ArchiveReader.hpp | 11 ++-- components/core/src/clp_s/JsonConstructor.cpp | 11 ++-- components/core/src/clp_s/JsonConstructor.hpp | 2 + components/core/src/clp_s/clp-s.cpp | 13 ++-- components/core/src/clp_s/search/Output.cpp | 7 +- .../core/src/clp_s/search/OutputHandler.cpp | 50 ++++++++------ .../core/src/clp_s/search/OutputHandler.hpp | 66 ++++++++++++------- 8 files changed, 115 insertions(+), 73 deletions(-) diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 93f905e3b..14cf4fa0b 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -1,26 +1,34 @@ #include "ArchiveReader.hpp" +#include +#include + #include "archive_constants.hpp" #include "ReaderUtils.hpp" +using std::string_view; + namespace clp_s { -void ArchiveReader::open(std::string const& archive_path) { +void ArchiveReader::open(string_view archives_dir, string_view archive_id) { if (m_is_open) { throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); } m_is_open = true; - m_archive_path = archive_path; + m_archive_id = archive_id; + std::filesystem::path archive_path{archives_dir}; + archive_path /= m_archive_id; + auto const archive_path_str = archive_path.string(); - m_var_dict = ReaderUtils::get_variable_dictionary_reader(m_archive_path); - m_log_dict = ReaderUtils::get_log_type_dictionary_reader(m_archive_path); - m_array_dict = ReaderUtils::get_array_dictionary_reader(m_archive_path); - m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(m_archive_path); + m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str); + m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str); + m_array_dict = ReaderUtils::get_array_dictionary_reader(archive_path_str); + m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(archive_path_str); - m_schema_tree = ReaderUtils::read_schema_tree(m_archive_path); - m_schema_map = ReaderUtils::read_schemas(m_archive_path); + m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str); + m_schema_map = ReaderUtils::read_schemas(archive_path_str); - m_tables_file_reader.open(m_archive_path + constants::cArchiveTablesFile); - m_table_metadata_file_reader.open(m_archive_path + constants::cArchiveTableMetadataFile); + m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile); + m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile); } void ArchiveReader::read_metadata() { diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 54eb42698..929252dcf 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -29,9 +30,10 @@ class ArchiveReader { /** * Opens an archive for reading. - * @param archive_path + * @param archives_dir + * @param archive_id */ - void open(std::string const& archive_path); + void open(std::string_view archives_dir, std::string_view archive_id); /** * Reads the dictionaries and metadata. @@ -92,6 +94,8 @@ class ArchiveReader { SchemaReader& read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records); + std::string_view get_archive_id() { return m_archive_id; } + std::shared_ptr get_variable_dictionary() { return m_var_dict; } std::shared_ptr get_log_type_dictionary() { return m_log_dict; } @@ -161,8 +165,7 @@ class ArchiveReader { ); bool m_is_open; - std::string m_archive_path; - + std::string m_archive_id; std::shared_ptr m_var_dict; std::shared_ptr m_log_dict; std::shared_ptr m_array_dict; diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 777f62ce2..218f1107f 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -13,7 +13,8 @@ namespace clp_s { JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_output_dir(option.output_dir), - m_archives_dir(option.archives_dir) { + m_archives_dir(option.archives_dir), + m_archive_id(option.archive_id) { std::error_code error_code; if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) { throw OperationFailed( @@ -28,12 +29,14 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) ); } - if (false == std::filesystem::is_directory(m_archives_dir)) { + std::filesystem::path archive_path{m_archives_dir}; + archive_path /= m_archive_id; + if (false == std::filesystem::is_directory(archive_path)) { throw OperationFailed( ErrorCodeFailure, __FILENAME__, __LINE__, - fmt::format("'{}' is not a directory", m_archives_dir) + fmt::format("'{}' is not a directory", archive_path.c_str()) ); } } @@ -43,7 +46,7 @@ void JsonConstructor::store() { writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending); m_archive_reader = std::make_unique(); - m_archive_reader->open(m_archives_dir); + m_archive_reader->open(m_archives_dir, m_archive_id); m_archive_reader->read_dictionaries_and_metadata(); m_archive_reader->store(writer); m_archive_reader->close(); diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 8aa35f904..688f19418 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -16,6 +16,7 @@ namespace clp_s { struct JsonConstructorOption { std::string archives_dir; + std::string archive_id; std::string output_dir; }; @@ -50,6 +51,7 @@ class JsonConstructor { private: std::string m_archives_dir; + std::string m_archive_id; std::string m_output_dir; std::unique_ptr m_archive_reader; diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index b1dda7508..7caef88a5 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -271,10 +271,11 @@ int main(int argc, char const* argv[]) { clp_s::JsonConstructorOption option; option.output_dir = command_line_arguments.get_output_dir(); + option.archives_dir = archives_dir; try { auto const& archive_id = command_line_arguments.get_archive_id(); if (false == archive_id.empty()) { - option.archives_dir = std::filesystem::path{archives_dir} / archive_id; + option.archive_id = archive_id; decompress_archive(option); } else { for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) { @@ -283,7 +284,7 @@ int main(int argc, char const* argv[]) { continue; } - option.archives_dir = entry.path(); + option.archive_id = entry.path().filename(); decompress_archive(option); } } @@ -330,10 +331,7 @@ int main(int argc, char const* argv[]) { auto const& archive_id = command_line_arguments.get_archive_id(); auto archive_reader = std::make_shared(); if (false == archive_id.empty()) { - std::filesystem::path const archives_dir_path{archives_dir}; - std::string const archive_path{archives_dir_path / archive_id}; - - archive_reader->open(archive_path); + archive_reader->open(archives_dir, archive_id); if (false == search_archive(command_line_arguments, archive_reader, expr, reducer_socket_fd)) { @@ -347,7 +345,8 @@ int main(int argc, char const* argv[]) { continue; } - archive_reader->open(entry.path()); + auto const archive_id = entry.path().filename().string(); + archive_reader->open(archives_dir, archive_id); if (false == search_archive( command_line_arguments, diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index 524c055fd..b6a3b8fe0 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -65,6 +65,7 @@ bool Output::filter() { populate_string_queries(top_level_expr); std::string message; + auto const archive_id = m_archive_reader->get_archive_id(); for (int32_t schema_id : matched_schemas) { m_expr_clp_query.clear(); m_expr_var_match_map.clear(); @@ -85,15 +86,15 @@ bool Output::filter() { auto& reader = m_archive_reader->read_table( schema_id, - m_output_handler->should_output_timestamp(), + m_output_handler->should_output_metadata(), m_should_marshal_records ); reader.initialize_filter(this); - if (m_output_handler->should_output_timestamp()) { + if (m_output_handler->should_output_metadata()) { epochtime_t timestamp; while (reader.get_next_message_with_timestamp(message, timestamp, this)) { - m_output_handler->write(message, timestamp); + m_output_handler->write(message, timestamp, archive_id); } } else { while (reader.get_next_message(message, this)) { diff --git a/components/core/src/clp_s/search/OutputHandler.cpp b/components/core/src/clp_s/search/OutputHandler.cpp index 47a66bbd5..8d6a1eaa5 100644 --- a/components/core/src/clp_s/search/OutputHandler.cpp +++ b/components/core/src/clp_s/search/OutputHandler.cpp @@ -1,6 +1,8 @@ #include "OutputHandler.hpp" #include +#include +#include #include @@ -9,9 +11,12 @@ #include "../../reducer/network_utils.hpp" #include "../../reducer/Record.hpp" +using std::string; +using std::string_view; + namespace clp_s::search { NetworkOutputHandler::NetworkOutputHandler( - std::string const& host, + string const& host, int port, bool should_output_timestamp ) @@ -23,18 +28,14 @@ NetworkOutputHandler::NetworkOutputHandler( } } -void NetworkOutputHandler::write(std::string const& message, epochtime_t timestamp) { - msgpack::type::tuple src(timestamp, message, ""); - msgpack::sbuffer m; - msgpack::pack(m, src); - - if (-1 == send(m_socket_fd, m.data(), m.size(), 0)) { - throw OperationFailed(ErrorCode::ErrorCodeFailureNetwork, __FILE__, __LINE__); - } -} - -void NetworkOutputHandler::write(std::string const& message) { - msgpack::type::tuple src(0, message, ""); +void NetworkOutputHandler::write( + string_view message, + epochtime_t timestamp, + string_view archive_id +) { + static constexpr string_view cOrigFilePathPlaceholder{""}; + msgpack::type::tuple const + src(timestamp, message, cOrigFilePathPlaceholder, archive_id); msgpack::sbuffer m; msgpack::pack(m, src); @@ -44,8 +45,8 @@ void NetworkOutputHandler::write(std::string const& message) { } ResultsCacheOutputHandler::ResultsCacheOutputHandler( - std::string const& uri, - std::string const& collection, + string const& uri, + string const& collection, uint64_t batch_size, uint64_t max_num_results, bool should_output_timestamp @@ -73,7 +74,8 @@ ErrorCode ResultsCacheOutputHandler::flush() { m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document( bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)), bsoncxx::builder::basic::kvp("message", std::move(result.message)), - bsoncxx::builder::basic::kvp("timestamp", result.timestamp) + bsoncxx::builder::basic::kvp("timestamp", result.timestamp), + bsoncxx::builder::basic::kvp("archive_id", std::move(result.archive_id)) ))); count++; @@ -98,12 +100,20 @@ ErrorCode ResultsCacheOutputHandler::flush() { return ErrorCode::ErrorCodeSuccess; } -void ResultsCacheOutputHandler::write(std::string const& message, epochtime_t timestamp) { +void ResultsCacheOutputHandler::write( + string_view message, + epochtime_t timestamp, + string_view archive_id +) { if (m_latest_results.size() < m_max_num_results) { - m_latest_results.emplace(std::make_unique("", message, timestamp)); + m_latest_results.emplace( + std::make_unique(string_view{}, message, timestamp, archive_id) + ); } else if (m_latest_results.top()->timestamp < timestamp) { m_latest_results.pop(); - m_latest_results.emplace(std::make_unique("", message, timestamp)); + m_latest_results.emplace( + std::make_unique(string_view{}, message, timestamp, archive_id) + ); } } @@ -114,7 +124,7 @@ CountOutputHandler::CountOutputHandler(int reducer_socket_fd) m_pipeline.add_pipeline_stage(std::make_shared()); } -void CountOutputHandler::write(std::string const& message) { +void CountOutputHandler::write(string_view message) { m_pipeline.push_record(reducer::EmptyRecord{}); } diff --git a/components/core/src/clp_s/search/OutputHandler.hpp b/components/core/src/clp_s/search/OutputHandler.hpp index 624c006be..ca12e32d3 100644 --- a/components/core/src/clp_s/search/OutputHandler.hpp +++ b/components/core/src/clp_s/search/OutputHandler.hpp @@ -5,8 +5,10 @@ #include #include +#include #include #include +#include #include #include @@ -28,8 +30,8 @@ namespace clp_s::search { class OutputHandler { public: // Constructors - explicit OutputHandler(bool should_output_timestamp, bool should_marshal_records) - : m_should_output_timestamp(should_output_timestamp), + explicit OutputHandler(bool should_output_metadata, bool should_marshal_records) + : m_should_output_metadata(should_output_metadata), m_should_marshal_records(should_marshal_records) {}; // Destructor @@ -40,14 +42,16 @@ class OutputHandler { * Writes a log event to the output handler. * @param message The message in the log event. * @param timestamp The timestamp of the log event. + * @param archive_id The archive containing the log event. */ - virtual void write(std::string const& message, epochtime_t timestamp) = 0; + virtual void write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) + = 0; /** * Writes a message to the output handler. * @param message The message to write. */ - virtual void write(std::string const& message) = 0; + virtual void write(std::string_view message) = 0; /** * Flushes the output handler after each table that gets searched. @@ -61,12 +65,12 @@ class OutputHandler { */ virtual ErrorCode finish() { return ErrorCode::ErrorCodeSuccess; } - [[nodiscard]] bool should_output_timestamp() const { return m_should_output_timestamp; } + [[nodiscard]] bool should_output_metadata() const { return m_should_output_metadata; } [[nodiscard]] bool should_marshal_records() const { return m_should_marshal_records; } private: - bool m_should_output_timestamp; + bool m_should_output_metadata; bool m_should_marshal_records; }; @@ -76,15 +80,16 @@ class OutputHandler { class StandardOutputHandler : public OutputHandler { public: // Constructors - explicit StandardOutputHandler(bool should_output_timestamp = false) - : OutputHandler(should_output_timestamp, true) {} + explicit StandardOutputHandler(bool should_output_metadata = false) + : OutputHandler(should_output_metadata, true) {} // Methods inherited from OutputHandler - void write(std::string const& message, epochtime_t timestamp) override { - printf("%" EPOCHTIME_T_PRINTF_FMT " %s", timestamp, message.c_str()); + void + write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override { + std::cout << archive_id << ": " << timestamp << " " << message; } - void write(std::string const& message) override { printf("%s", message.c_str()); } + void write(std::string_view message) override { std::cout << message; } }; /** @@ -104,7 +109,7 @@ class NetworkOutputHandler : public OutputHandler { explicit NetworkOutputHandler( std::string const& host, int port, - bool should_output_timestamp = false + bool should_output_metadata = false ); // Destructor @@ -115,9 +120,10 @@ class NetworkOutputHandler : public OutputHandler { } // Methods inherited from OutputHandler - void write(std::string const& message, epochtime_t timestamp) override; + void + write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override; - void write(std::string const& message) override; + void write(std::string_view message) override { write(message, 0, {}); } private: std::string m_host; @@ -133,14 +139,21 @@ class ResultsCacheOutputHandler : public OutputHandler { // Types struct QueryResult { // Constructors - QueryResult(std::string original_path, std::string message, epochtime_t timestamp) - : original_path(std::move(original_path)), - message(std::move(message)), - timestamp(timestamp) {} + QueryResult( + std::string_view original_path, + std::string_view message, + epochtime_t timestamp, + std::string_view archive_id + ) + : original_path(original_path), + message(message), + timestamp(timestamp), + archive_id(archive_id) {} std::string original_path; std::string message; epochtime_t timestamp; + std::string archive_id; }; struct QueryResultGreaterTimestampComparator { @@ -165,7 +178,7 @@ class ResultsCacheOutputHandler : public OutputHandler { std::string const& collection, uint64_t batch_size, uint64_t max_num_results, - bool should_output_timestamp = true + bool should_output_metadata = true ); // Methods inherited from OutputHandler @@ -176,9 +189,10 @@ class ResultsCacheOutputHandler : public OutputHandler { */ ErrorCode flush() override; - void write(std::string const& message, epochtime_t timestamp) override; + void + write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override; - void write(std::string const& message) override { write(message, 0); } + void write(std::string_view message) override { write(message, 0, {}); } private: mongocxx::client m_client; @@ -202,9 +216,10 @@ class CountOutputHandler : public OutputHandler { CountOutputHandler(int reducer_socket_fd); // Methods inherited from OutputHandler - void write(std::string const& message, epochtime_t timestamp) override {} + void + write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override {} - void write(std::string const& message) override; + void write(std::string_view message) override; /** * Flushes the count. @@ -231,12 +246,13 @@ class CountByTimeOutputHandler : public OutputHandler { m_count_by_time_bucket_size{count_by_time_bucket_size} {} // Methods inherited from OutputHandler - void write(std::string const& message, epochtime_t timestamp) override { + void + write(std::string_view message, epochtime_t timestamp, std::string_view archive_id) override { int64_t bucket = (timestamp / m_count_by_time_bucket_size) * m_count_by_time_bucket_size; m_bucket_counts[bucket] += 1; } - void write(std::string const& message) override {} + void write(std::string_view message) override {} /** * Flushes the counts.