From 4a67160d5362d9d663d0b907040f398ca75e9790 Mon Sep 17 00:00:00 2001 From: Kirk Rodrigues <2454684+kirkrodrigues@users.noreply.github.com> Date: Mon, 8 Aug 2022 09:04:21 -0400 Subject: [PATCH] Basic implementation of distributed search following conventions of distributed compression. --- .../initialize-orchestration-db.py | 35 ++ .../compression_job_handler.py | 2 +- components/core/CMakeLists.txt | 16 +- components/core/README.md | 1 + components/core/src/PThread.cpp | 58 +++ components/core/src/PThread.hpp | 84 ++++ .../core/src/clo/CommandLineArguments.cpp | 22 +- .../core/src/clo/CommandLineArguments.hpp | 4 + .../src/clo/ControllerMonitoringThread.cpp | 47 +++ .../src/clo/ControllerMonitoringThread.hpp | 29 ++ components/core/src/clo/clo.cpp | 226 +++++++---- .../src/networking/SocketOperationFailed.cpp | 1 + .../src/networking/SocketOperationFailed.hpp | 22 ++ .../core/src/networking/socket_utils.cpp | 58 +++ .../core/src/networking/socket_utils.hpp | 48 +++ .../install-packages-from-source.sh | 1 + .../install-packages-from-source.sh | 1 + .../install-packages-from-source.sh | 1 + .../core/tools/scripts/lib_install/msgpack.sh | 76 ++++ .../executor/celeryconfig.py | 14 +- .../executor/compression_task.py | 10 +- .../job_orchestration/executor/search_task.py | 108 +++++ .../job_orchestration/job_config.py | 7 + .../job_orchestration/scheduler/scheduler.py | 371 +++++++++++++----- .../scheduler/scheduler_data.py | 68 +++- .../package-template/src/sbin/native/search | 208 ++++++++-- components/package-template/src/sbin/search | 2 +- .../package-template/src/sbin/start-clp | 2 +- .../packager/install-scripts/install-core.sh | 2 +- 29 files changed, 1295 insertions(+), 229 deletions(-) create mode 100644 components/core/src/PThread.cpp create mode 100644 components/core/src/PThread.hpp create mode 100644 components/core/src/clo/ControllerMonitoringThread.cpp create mode 100644 components/core/src/clo/ControllerMonitoringThread.hpp create mode 100644 components/core/src/networking/SocketOperationFailed.cpp create mode 100644 components/core/src/networking/SocketOperationFailed.hpp create mode 100644 components/core/src/networking/socket_utils.cpp create mode 100644 components/core/src/networking/socket_utils.hpp create mode 100755 components/core/tools/scripts/lib_install/msgpack.sh create mode 100644 components/job-orchestration/job_orchestration/executor/search_task.py diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index 2057ef24c..d32a05d76 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -73,6 +73,41 @@ def main(argv): ) ROW_FORMAT=DYNAMIC """) + scheduling_db_cursor.execute(f""" + CREATE TABLE IF NOT EXISTS `search_jobs` ( + `id` INT NOT NULL AUTO_INCREMENT, + `status` VARCHAR(16) NOT NULL DEFAULT '{JobStatus.SCHEDULING}', + `status_msg` VARCHAR(255) NOT NULL DEFAULT '', + `creation_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + `start_time` DATETIME NULL DEFAULT NULL, + `duration` INT NULL DEFAULT NULL, + `num_tasks` INT NOT NULL DEFAULT '0', + `num_tasks_completed` INT NOT NULL DEFAULT '0', + `clp_binary_version` INT NULL DEFAULT NULL, + `search_config` VARBINARY(60000) NOT NULL, + PRIMARY KEY (`id`) USING BTREE, + INDEX `JOB_STATUS` (`status`) USING BTREE + ) ROW_FORMAT=DYNAMIC + """) + + scheduling_db_cursor.execute(f""" + CREATE TABLE IF NOT EXISTS `search_tasks` ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `status` VARCHAR(16) NOT NULL DEFAULT '{TaskStatus.SUBMITTED}', + `scheduled_time` DATETIME NULL DEFAULT NULL, + `start_time` DATETIME NULL DEFAULT NULL, + `duration` SMALLINT NULL DEFAULT NULL, + `job_id` INT NOT NULL, + `archive_id` VARCHAR(64) NOT NULL, + PRIMARY KEY (`id`) USING BTREE, + INDEX `job_id` (`job_id`) USING BTREE, + INDEX `TASK_STATUS` (`status`) USING BTREE, + INDEX `TASK_START_TIME` (`start_time`) USING BTREE, + CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`) + REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION + ) ROW_FORMAT=DYNAMIC + """) + scheduling_db.commit() except: logger.exception("Failed to create scheduling tables.") diff --git a/components/compression-job-handler/compression_job_handler/compression_job_handler.py b/components/compression-job-handler/compression_job_handler/compression_job_handler.py index c2587b62b..f43b6a9b8 100644 --- a/components/compression-job-handler/compression_job_handler/compression_job_handler.py +++ b/components/compression-job-handler/compression_job_handler/compression_job_handler.py @@ -361,7 +361,7 @@ def handle_job(scheduling_db, scheduling_db_cursor, clp_io_config: ClpIoConfig, if JobStatus.SCHEDULED == job_status: pass # Simply wait another iteration - elif JobStatus.COMPLETED == job_status: + elif JobStatus.SUCCEEDED == job_status: # All tasks in the job is done speed = 0 if not no_progress_reporting: diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 7c688d1c4..dc7eb5401 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -111,6 +111,14 @@ else() message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for MariaDBClient") endif() +# Find and setup msgpack +find_package(msgpack 4.1.1 REQUIRED) +if(msgpack_FOUND) + message(STATUS "Found msgpack ${msgpack_VERSION}") +else() + message(FATAL_ERROR "Could not find msgpack") +endif() + # Add yaml-cpp add_subdirectory(submodules/yaml-cpp EXCLUDE_FROM_ALL) @@ -489,14 +497,20 @@ set(SOURCE_FILES_clo submodules/sqlite3/sqlite3.c submodules/sqlite3/sqlite3.h submodules/sqlite3/sqlite3ext.h - ) + src/networking/socket_utils.cpp + src/networking/socket_utils.hpp + src/networking/SocketOperationFailed.cpp + src/networking/SocketOperationFailed.hpp + src/PThread.cpp src/PThread.hpp src/clo/ControllerMonitoringThread.cpp src/clo/ControllerMonitoringThread.hpp) add_executable(clo ${SOURCE_FILES_clo}) target_link_libraries(clo PRIVATE Boost::filesystem Boost::iostreams Boost::program_options fmt::fmt + msgpackc-cxx spdlog::spdlog ${sqlite_LIBRARY_DEPENDENCIES} + Threads::Threads ZStd::ZStd ) target_compile_features(clo diff --git a/components/core/README.md b/components/core/README.md index a40882c5e..f0493ecd5 100644 --- a/components/core/README.md +++ b/components/core/README.md @@ -85,6 +85,7 @@ so we've included some scripts to download, compile, and install them: ./tools/scripts/lib_install/libarchive.sh 3.5.1 ./tools/scripts/lib_install/lz4.sh 1.8.2 ./tools/scripts/lib_install/mariadb-connector-c.sh 3.2.3 +./tools/scripts/lib_install/msgpack.sh 4.1.1 ./tools/scripts/lib_install/spdlog.sh 1.9.2 ./tools/scripts/lib_install/zstandard.sh 1.4.9 ``` diff --git a/components/core/src/PThread.cpp b/components/core/src/PThread.cpp new file mode 100644 index 000000000..dc0f0a56b --- /dev/null +++ b/components/core/src/PThread.cpp @@ -0,0 +1,58 @@ +#include "PThread.hpp" + +// spdlog +#include + +// Project headers +#include "Defs.h" + +PThread::~PThread () { + if (m_thread_running) { + SPDLOG_WARN("PThread did not exit before being destroyed."); + int returned_value = pthread_cancel(m_thread); + if (0 != returned_value) { + SPDLOG_ERROR("Failed to cancel thread, errno={}", returned_value); + } + } +} + +void PThread::start () { + int returned_value = pthread_create(&m_thread, nullptr, thread_entry_point, this); + if (0 != returned_value) { + SPDLOG_ERROR("Failed to start thread, errno={}", returned_value); + throw OperationFailed(ErrorCode_errno, __FILENAME__, __LINE__); + } + m_thread_running = true; +} + +void PThread::detach () { + int returned_value = pthread_detach(m_thread); + if (0 != returned_value) { + throw OperationFailed(ErrorCode_errno, __FILENAME__, __LINE__); + } +} + +ErrorCode PThread::try_join (void*& thread_return_value) { + auto return_value = pthread_join(m_thread, &thread_return_value); + if (0 != return_value) { + return ErrorCode_errno; + } else { + return ErrorCode_Success; + } +} + +void* PThread::join () { + void* thread_return_value; + auto error_code = try_join(thread_return_value); + if (ErrorCode_Success != error_code) { + throw OperationFailed(error_code, __FILENAME__, __LINE__); + } + return thread_return_value; +} + +void* PThread::thread_entry_point (void* arg) { + auto thread = reinterpret_cast(arg); + auto return_value = thread->thread_method(); + thread->mark_as_exited(); + return return_value; +} diff --git a/components/core/src/PThread.hpp b/components/core/src/PThread.hpp new file mode 100644 index 000000000..95f9bd732 --- /dev/null +++ b/components/core/src/PThread.hpp @@ -0,0 +1,84 @@ +#ifndef PTHREAD_HPP +#define PTHREAD_HPP + +// C standard libraries +#include + +// C++ standard libraries +#include + +// Project headers +#include "ErrorCode.hpp" +#include "TraceableException.hpp" + +/** + * C++ wrapper for POSIX threads + */ +class PThread { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed (ErrorCode error_code, const char* const filename, int line_number) : + TraceableException(error_code, filename, line_number) {} + + // Methods + const char* what () const noexcept override { + return "PThread operation failed"; + } + }; + + // Constructors + PThread () : m_thread(0), m_thread_running(false) {}; + + // Destructor + virtual ~PThread (); + + // Methods + /** + * Starts the thread + */ + void start (); + /** + * Detaches the thread so its resources are released without being joined + */ + void detach (); + /** + * Tries to join with the thread + * @param thread_return_value The thread's return value + * @return ErrorCode_errno on failure + * @return ErrorCode_Success otherwise + */ + ErrorCode try_join (void*& thread_return_value); + /** + * Joins with the thread + * @return The thread's return value + */ + void* join (); + + bool is_running () const { return m_thread_running; } + +protected: + // Methods + virtual void* thread_method () = 0; + /** + * Indicates that the thread has exited + */ + void mark_as_exited () { m_thread_running = false; } + +private: + // Methods + /** + * Entry-point method for the thread + * @param arg + * @return Same as PThread::thread_method + */ + static void* thread_entry_point (void* arg); + + // Variables + pthread_t m_thread; + std::atomic_bool m_thread_running; +}; + +#endif //PTHREAD_HPP diff --git a/components/core/src/clo/CommandLineArguments.cpp b/components/core/src/clo/CommandLineArguments.cpp index baf1411c3..992ef9db8 100644 --- a/components/core/src/clo/CommandLineArguments.cpp +++ b/components/core/src/clo/CommandLineArguments.cpp @@ -68,11 +68,15 @@ namespace clo { // Define hidden positional options (not shown in Boost's program options help message) po::options_description hidden_positional_options; hidden_positional_options.add_options() + ("search-controller-host", po::value(&m_search_controller_host)) + ("search-controller-port", po::value(&m_search_controller_port)) ("archive-path", po::value(&m_archive_path)) ("wildcard-string", po::value(&m_search_string)) ("file-path", po::value(&m_file_path)) ; po::positional_options_description positional_options_description; + positional_options_description.add("search-controller-host", 1); + positional_options_description.add("search-controller-port", 1); positional_options_description.add("archive-path", 1); positional_options_description.add("wildcard-string", 1); positional_options_description.add("file-path", 1); @@ -117,8 +121,9 @@ namespace clo { cerr << endl; cerr << "Examples:" << endl; - cerr << R"( # Search ARCHIVE_PATH for " ERROR ")" << endl; - cerr << " " << get_program_name() << R"( ARCHIVE_PATH " ERROR ")" << endl; + cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to the controller at localhost:5555)" + << endl; + cerr << " " << get_program_name() << R"( localhost 5555 ARCHIVE_PATH " ERROR ")" << endl; cerr << endl; cerr << "Options can be specified on the command line or through a configuration file." << endl; @@ -132,6 +137,16 @@ namespace clo { return ParsingResult::InfoCommand; } + // Validate search controller host was specified + if (m_search_controller_host.empty()) { + throw invalid_argument("SEARCH_CONTROLLER_HOST not specified or empty."); + } + + // Validate search controller port was specified + if (m_search_controller_port.empty()) { + throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty."); + } + // Validate archive path was specified if (m_archive_path.empty()) { throw invalid_argument("ARCHIVE_PATH not specified or empty."); @@ -190,6 +205,7 @@ namespace clo { } void CommandLineArguments::print_basic_usage () const { - cerr << "Usage: " << get_program_name() << R"( [OPTIONS] ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl; + cerr << "Usage: " << get_program_name() << " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT " + << R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl; } } diff --git a/components/core/src/clo/CommandLineArguments.hpp b/components/core/src/clo/CommandLineArguments.hpp index 0371fdb99..128625e5e 100644 --- a/components/core/src/clo/CommandLineArguments.hpp +++ b/components/core/src/clo/CommandLineArguments.hpp @@ -22,6 +22,8 @@ namespace clo { // Methods ParsingResult parse_arguments (int argc, const char* argv[]) override; + const std::string& get_search_controller_host () const { return m_search_controller_host; } + const std::string& get_search_controller_port () const { return m_search_controller_port; } const std::string& get_archive_path () const { return m_archive_path; } bool ignore_case () const { return m_ignore_case; } const std::string& get_search_string () const { return m_search_string; } @@ -34,6 +36,8 @@ namespace clo { void print_basic_usage () const override; // Variables + std::string m_search_controller_host; + std::string m_search_controller_port; std::string m_archive_path; bool m_ignore_case; std::string m_search_string; diff --git a/components/core/src/clo/ControllerMonitoringThread.cpp b/components/core/src/clo/ControllerMonitoringThread.cpp new file mode 100644 index 000000000..192523666 --- /dev/null +++ b/components/core/src/clo/ControllerMonitoringThread.cpp @@ -0,0 +1,47 @@ +#include "ControllerMonitoringThread.hpp" + +// C standard libraries +#include + +// spdlog +#include + +// Project headers +#include "../networking/socket_utils.hpp" + +void* ControllerMonitoringThread::thread_method () { + // Wait for the controller socket to close + constexpr size_t cBufLen = 4096; + char buf[cBufLen]; + size_t num_bytes_received; + for (bool exit = false; false == exit;) { + auto error_code = networking::try_receive(m_controller_socket_fd, buf, cBufLen, num_bytes_received); + switch (error_code) { + case ErrorCode_EndOfFile: + // Controller closed the connection + m_query_cancelled = true; + exit = true; + break; + case ErrorCode_Success: + // Unexpectedly received data + SPDLOG_ERROR("Unexpected received {} bytes of data from controller.", num_bytes_received); + break; + case ErrorCode_BadParam: + SPDLOG_ERROR("Bad parameter sent to try_receive.", num_bytes_received); + exit = true; + break; + case ErrorCode_errno: + SPDLOG_ERROR("Failed to receive data from controller, errno={}.", errno); + exit = true; + break; + default: + SPDLOG_ERROR("Unexpected error from try_receive, error_code={}.", error_code); + exit = true; + break; + } + } + + close(m_controller_socket_fd); + + return nullptr; +} diff --git a/components/core/src/clo/ControllerMonitoringThread.hpp b/components/core/src/clo/ControllerMonitoringThread.hpp new file mode 100644 index 000000000..406f69e00 --- /dev/null +++ b/components/core/src/clo/ControllerMonitoringThread.hpp @@ -0,0 +1,29 @@ +#ifndef CONTROLLERMONITORINGTHREAD_HPP +#define CONTROLLERMONITORINGTHREAD_HPP + +// Project headers +#include "../PThread.hpp" + +/** + * A thread that waits for the controller to close the connection at which time + * it will indicate the query has been cancelled. + */ +class ControllerMonitoringThread : public PThread { +public: + // Constructor + ControllerMonitoringThread (int controller_socket_fd) : m_controller_socket_fd(controller_socket_fd), + m_query_cancelled(false) {} + + const std::atomic_bool& get_query_cancelled () const { return m_query_cancelled; } + +protected: + // Methods + void* thread_method () override; + +private: + // Variables + int m_controller_socket_fd; + std::atomic_bool m_query_cancelled; +}; + +#endif //CONTROLLERMONITORINGTHREAD_HPP diff --git a/components/core/src/clo/clo.cpp b/components/core/src/clo/clo.cpp index 7cd0ba68f..a7317b45a 100644 --- a/components/core/src/clo/clo.cpp +++ b/components/core/src/clo/clo.cpp @@ -1,9 +1,15 @@ +// C standard libraries +#include + // C++ libraries #include // Boost libraries #include +// msgpack +#include + // spdlog #include #include @@ -12,9 +18,11 @@ #include "../Defs.h" #include "../Grep.hpp" #include "../Profiler.hpp" +#include "../networking/socket_utils.hpp" #include "../streaming_archive/Constants.hpp" #include "../Utils.hpp" #include "CommandLineArguments.hpp" +#include "ControllerMonitoringThread.hpp" using clo::CommandLineArguments; using std::cout; @@ -28,84 +36,112 @@ using streaming_archive::reader::Archive; using streaming_archive::reader::File; using streaming_archive::reader::Message; +// Local types +enum class SearchFilesResult { + OpenFailure, + ResultSendFailure, + Success +}; + /** - * Prints search result to stdout in binary format + * Connects to the search controller + * @param controller_host + * @param controller_port + * @return -1 on failure + * @return Search controller socket file descriptor otherwise + */ +static int connect_to_search_controller (const string& controller_host, const string& controller_port); +/** + * Sends the search result to the search controller * @param orig_file_path * @param compressed_msg * @param decompressed_msg + * @param controller_socket_fd + * @return Same as networking::try_send */ -static void print_result (const string& orig_file_path, const Message& compressed_msg, const string& decompressed_msg); +static ErrorCode send_result (const string& orig_file_path, const Message& compressed_msg, + const string& decompressed_msg, int controller_socket_fd); /** * Searches all files referenced by a given database cursor * @param query * @param archive * @param file_metadata_ix - * @return true on success, false otherwise + * @param query_cancelled + * @param controller_socket_fd + * @return SearchFilesResult::OpenFailure on failure to open a compressed file + * @return SearchFilesResult::ResultSendFailure on failure to send a result + * @return SearchFilesResult::Success otherwise */ -static bool search_files (Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix); +static SearchFilesResult search_files (Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix, + const std::atomic_bool& query_cancelled, int controller_socket_fd); /** * Searches an archive with the given path * @param command_line_args * @param archive_path + * @param query_cancelled + * @param controller_socket_fd * @return true on success, false otherwise */ -static bool search_archive (const CommandLineArguments& command_line_args, const boost::filesystem::path& archive_path); - -static void print_result (const string& orig_file_path, const Message& compressed_msg, const string& decompressed_msg) { - bool write_successful = true; - do { - size_t length; - size_t num_elems_written; - - // Write file path - length = orig_file_path.length(); - num_elems_written = fwrite(&length, sizeof(length), 1, stdout); - if (num_elems_written < 1) { - write_successful = false; - break; - } - num_elems_written = fwrite(orig_file_path.c_str(), sizeof(char), length, stdout); - if (num_elems_written < length) { - write_successful = false; - break; - } +static bool search_archive (const CommandLineArguments& command_line_args, const boost::filesystem::path& archive_path, + const std::atomic_bool& query_cancelled, int controller_socket_fd); - // Write timestamp - epochtime_t timestamp = compressed_msg.get_ts_in_milli(); - num_elems_written = fwrite(×tamp, sizeof(timestamp), 1, stdout); - if (num_elems_written < 1) { - write_successful = false; - break; - } +static int connect_to_search_controller (const string& controller_host, const string& controller_port) { + // Get address info for controller + struct addrinfo hints = {}; + // Address can be IPv4 or IPV6 + hints.ai_family = AF_UNSPEC; + // TCP socket + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + struct addrinfo* addresses_head = nullptr; + int error = getaddrinfo(controller_host.c_str(), controller_port.c_str(), &hints, &addresses_head); + if (0 != error) { + SPDLOG_ERROR("Failed to get address information for search controller, error={}", error); + return -1; + } - // Write logtype ID - auto logtype_id = compressed_msg.get_logtype_id(); - num_elems_written = fwrite(&logtype_id, sizeof(logtype_id), 1, stdout); - if (num_elems_written < 1) { - write_successful = false; - break; + // Try each address until a socket can be created and connected to + int controller_socket_fd = -1; + for (auto curr = addresses_head; nullptr != curr; curr = curr->ai_next) { + // Create socket + controller_socket_fd = socket(curr->ai_family, curr->ai_socktype, curr->ai_protocol); + if (-1 == controller_socket_fd) { + continue; } - // Write message - length = decompressed_msg.length(); - num_elems_written = fwrite(&length, sizeof(length), 1, stdout); - if (num_elems_written < 1) { - write_successful = false; + // Connect to address + if (connect(controller_socket_fd, curr->ai_addr, curr->ai_addrlen) != -1) { break; } - num_elems_written = fwrite(decompressed_msg.c_str(), sizeof(char), length, stdout); - if (num_elems_written < length) { - write_successful = false; - break; - } - } while (false); - if (!write_successful) { - SPDLOG_ERROR("Failed to write result in binary form, errno={}", errno); + + // Failed to connect, so close socket + close(controller_socket_fd); + controller_socket_fd = -1; + } + freeaddrinfo(addresses_head); + if (-1 == controller_socket_fd) { + SPDLOG_ERROR("Failed to connect to search controller, errno={}", errno); + return -1; } + + return controller_socket_fd; +} + +static ErrorCode send_result (const string& orig_file_path, const Message& compressed_msg, + const string& decompressed_msg, int controller_socket_fd) +{ + msgpack::type::tuple src(orig_file_path, compressed_msg.get_ts_in_milli(), + decompressed_msg); + msgpack::sbuffer m; + msgpack::pack(m, src); + return networking::try_send(controller_socket_fd, m.data(), m.size()); } -static bool search_files (Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix) { - bool error_occurred = false; +static SearchFilesResult search_files (Query& query, Archive& archive, MetadataDB::FileIterator& file_metadata_ix, + const std::atomic_bool& query_cancelled, int controller_socket_fd) +{ + SearchFilesResult result = SearchFilesResult::Success; File compressed_file; Message compressed_message; @@ -122,7 +158,7 @@ static bool search_files (Query& query, Archive& archive, MetadataDB::FileIterat } else { SPDLOG_ERROR("Failed to open {}, error={}", orig_path.c_str(), error_code); } - error_occurred = true; + result = SearchFilesResult::OpenFailure; continue; } @@ -131,24 +167,38 @@ static bool search_files (Query& query, Archive& archive, MetadataDB::FileIterat } else { query.make_all_sub_queries_relevant(); } - while (Grep::search_and_decompress(query, archive, compressed_file, compressed_message, decompressed_message)) { - print_result(compressed_file.get_orig_path(), compressed_message, decompressed_message); + while (false == query_cancelled && + Grep::search_and_decompress(query, archive, compressed_file, compressed_message, decompressed_message)) + { + error_code = send_result(compressed_file.get_orig_path(), compressed_message, decompressed_message, + controller_socket_fd); + if (ErrorCode_Success != error_code) { + result = SearchFilesResult::ResultSendFailure; + break; + } + } + if (SearchFilesResult::ResultSendFailure == result) { + // Stop search now since results aren't reaching the controller + break; } archive.close_file(compressed_file); } - return error_occurred; + return result; } -static bool search_archive (const CommandLineArguments& command_line_args, const boost::filesystem::path& archive_path) { +static bool search_archive (const CommandLineArguments& command_line_args, const boost::filesystem::path& archive_path, + const std::atomic_bool& query_cancelled, int controller_socket_fd) +{ if (false == boost::filesystem::exists(archive_path)) { SPDLOG_ERROR("Archive '{}' does not exist.", archive_path.c_str()); return false; } auto archive_metadata_file = archive_path / streaming_archive::cMetadataFileName; if (false == boost::filesystem::exists(archive_metadata_file)) { - SPDLOG_ERROR("Archive metadata file '{}' does not exist. '{}' may not be an archive.", archive_metadata_file.c_str(), archive_path.c_str()); + SPDLOG_ERROR("Archive metadata file '{}' does not exist. '{}' may not be an archive.", + archive_metadata_file.c_str(), archive_path.c_str()); return false; } @@ -160,10 +210,10 @@ static bool search_archive (const CommandLineArguments& command_line_args, const auto search_end_ts = command_line_args.get_search_end_ts(); Query query; - if (false == Grep::process_raw_query(archive_reader, command_line_args.get_search_string(), search_begin_ts, search_end_ts, command_line_args.ignore_case(), - query)) + if (false == Grep::process_raw_query(archive_reader, command_line_args.get_search_string(), search_begin_ts, + search_end_ts, command_line_args.ignore_case(), query)) { - return false; + return true; } // Get all segments potentially containing query results @@ -173,15 +223,17 @@ static bool search_archive (const CommandLineArguments& command_line_args, const ids_of_segments_to_search.insert(ids_of_matching_segments.cbegin(), ids_of_matching_segments.cend()); } - // Search files outside a segment - auto file_metadata_ix_ptr = archive_reader.get_file_iterator(search_begin_ts, search_end_ts, command_line_args.get_file_path(), cInvalidSegmentId); + // Search segments + auto file_metadata_ix_ptr = archive_reader.get_file_iterator(search_begin_ts, search_end_ts, + command_line_args.get_file_path(), cInvalidSegmentId); auto& file_metadata_ix = *file_metadata_ix_ptr; - search_files(query, archive_reader, file_metadata_ix); - - // Search files inside a segment for (auto segment_id : ids_of_segments_to_search) { file_metadata_ix.set_segment_id(segment_id); - search_files(query, archive_reader, file_metadata_ix); + auto result = search_files(query, archive_reader, file_metadata_ix, query_cancelled, controller_socket_fd); + if (SearchFilesResult::ResultSendFailure == result) { + // Stop search now since results aren't reaching the controller + break; + } } file_metadata_ix_ptr.reset(nullptr); @@ -215,22 +267,50 @@ int main (int argc, const char* argv[]) { break; } + int controller_socket_fd = connect_to_search_controller(command_line_args.get_search_controller_host(), + command_line_args.get_search_controller_port()); + if (-1 == controller_socket_fd) { + return -1; + } + const auto archive_path = boost::filesystem::path(command_line_args.get_archive_path()); + ControllerMonitoringThread controller_monitoring_thread(controller_socket_fd); + controller_monitoring_thread.start(); + + int return_value = 0; try { - if (false == search_archive(command_line_args, archive_path)) { - return -1; + if (false == search_archive(command_line_args, archive_path, controller_monitoring_thread.get_query_cancelled(), + controller_socket_fd)) + { + return_value = -1; } } catch (TraceableException& e) { auto error_code = e.get_error_code(); if (ErrorCode_errno == error_code) { SPDLOG_ERROR("Search failed: {}:{} {}, errno={}", e.get_filename(), e.get_line_number(), e.what(), errno); - return -1; + return_value = -1; } else { - SPDLOG_ERROR("Search failed: {}:{} {}, error_code={}", e.get_filename(), e.get_line_number(), e.what(), error_code); - return -1; + SPDLOG_ERROR("Search failed: {}:{} {}, error_code={}", e.get_filename(), e.get_line_number(), e.what(), + error_code); + return_value = -1; } } - return 0; + // Unblock the controller monitoring thread if it's blocked + auto shutdown_result = shutdown(controller_socket_fd, SHUT_RDWR); + if (0 != shutdown_result) { + if (ENOTCONN != shutdown_result) { + SPDLOG_ERROR("Failed to shutdown socket, error={}", shutdown_result); + } // else connection already disconnected, so nothing to do + } + + void* thread_return_value; + auto error_code = controller_monitoring_thread.try_join(thread_return_value); + if (ErrorCode_Success != error_code) { + SPDLOG_ERROR("Failed to join with controller monitoring thread: errno={}", errno); + return_value = -1; + } + + return return_value; } diff --git a/components/core/src/networking/SocketOperationFailed.cpp b/components/core/src/networking/SocketOperationFailed.cpp new file mode 100644 index 000000000..c899023c4 --- /dev/null +++ b/components/core/src/networking/SocketOperationFailed.cpp @@ -0,0 +1 @@ +#include "SocketOperationFailed.hpp" diff --git a/components/core/src/networking/SocketOperationFailed.hpp b/components/core/src/networking/SocketOperationFailed.hpp new file mode 100644 index 000000000..ceb0cd638 --- /dev/null +++ b/components/core/src/networking/SocketOperationFailed.hpp @@ -0,0 +1,22 @@ +#ifndef NETWORKING_SOCKETOPERATIONFAILED_HPP +#define NETWORKING_SOCKETOPERATIONFAILED_HPP + +// Project headers +#include "../ErrorCode.hpp" +#include "../TraceableException.hpp" + +namespace networking { + class SocketOperationFailed : public TraceableException { + public: + // Constructors + SocketOperationFailed (ErrorCode error_code, const char* const filename, int line_number) : + TraceableException(error_code, filename, line_number) {} + + // Methods + const char* what () const noexcept override { + return "Socket operation failed"; + } + }; +} + +#endif //NETWORKING_SOCKETOPERATIONFAILED_HPP diff --git a/components/core/src/networking/socket_utils.cpp b/components/core/src/networking/socket_utils.cpp new file mode 100644 index 000000000..c965206a8 --- /dev/null +++ b/components/core/src/networking/socket_utils.cpp @@ -0,0 +1,58 @@ +#include "socket_utils.hpp" + +// C standard libraries +#include + +// C++ standard libraries +#include + +// Project headers +#include "../Defs.h" +#include "SocketOperationFailed.hpp" + +namespace networking { + ErrorCode try_send (int fd, const char* buf, size_t buf_len) { + if (fd < 0 || nullptr == buf) { + return ErrorCode_BadParam; + } + + ssize_t num_bytes_sent = ::send(fd, buf, buf_len, 0); + if (-1 == num_bytes_sent) { + return ErrorCode_errno; + } + + return ErrorCode_Success; + } + + void send (int fd, const char* buf, size_t buf_len) { + auto error_code = try_send(fd, buf, buf_len); + if (ErrorCode_Success != error_code) { + throw SocketOperationFailed(error_code, __FILENAME__, __LINE__); + } + } + + + ErrorCode try_receive (int fd, char* buf, size_t buf_len, size_t& num_bytes_received) { + if (fd < 0 || nullptr == buf) { + return ErrorCode_BadParam; + } + + ssize_t result = recv(fd, buf, buf_len, 0); + if (result < 0) { + return ErrorCode_errno; + } + if (0 == result) { + return ErrorCode_EndOfFile; + } + num_bytes_received = result; + + return ErrorCode_Success; + } + + void receive (int fd, char* buf, size_t buf_len, size_t& num_bytes_received) { + auto error_code = try_receive(fd, buf, buf_len, num_bytes_received); + if (ErrorCode_Success != error_code) { + throw SocketOperationFailed(error_code, __FILENAME__, __LINE__); + } + } +} diff --git a/components/core/src/networking/socket_utils.hpp b/components/core/src/networking/socket_utils.hpp new file mode 100644 index 000000000..95a24a59f --- /dev/null +++ b/components/core/src/networking/socket_utils.hpp @@ -0,0 +1,48 @@ +#ifndef NETWORKING_SOCKET_UTILS_HPP +#define NETWORKING_SOCKET_UTILS_HPP + +// C++ standard libraries +#include + +// Project headers +#include "../ErrorCode.hpp" + +namespace networking { + // Methods + /** + * Tries to send a buffer of data over the socket + * @param fd + * @param buf + * @param buf_len + * @return ErrorCode_BadParam if the file descriptor or buffer pointer is invalid + * @return ErrorCode_errno if sending failed + * @return ErrorCode_Success otherwise + */ + ErrorCode try_send (int fd, const char* buf, size_t buf_len); + /** + * Sends a buffer of data over the socket + * @param fd + * @param buf + * @param buf_len + */ + void send (int fd, const char* buf, size_t buf_len); + + /** + * Tries to receive up to a given number of bytes over a socket + * @param buf Buffer to store received bytes + * @param buf_len Number of bytes to receive + * @return ErrorCode_BadParam if file descriptor or buffer pointer are invalid + * @return ErrorCode_EndOfFile on EOF + * @return ErrorCode_errno if receiving failed + * @return ErrorCode_Success otherwise + */ + ErrorCode try_receive (int fd, char* buf, size_t buf_len, size_t& num_bytes_received); + /** + * Receives up to the give number of bytes over a socket + * @param buf Buffer to store received bytes + * @param buf_len Number of bytes to receive + */ + void receive (int fd, char* buf, size_t buf_len, size_t& num_bytes_received); +} + +#endif //NETWORKING_SOCKET_UTILS_HPP diff --git a/components/core/tools/docker-images/clp-env-base-bionic/setup-scripts/install-packages-from-source.sh b/components/core/tools/docker-images/clp-env-base-bionic/setup-scripts/install-packages-from-source.sh index 3b5bd2b8b..d00e436a7 100755 --- a/components/core/tools/docker-images/clp-env-base-bionic/setup-scripts/install-packages-from-source.sh +++ b/components/core/tools/docker-images/clp-env-base-bionic/setup-scripts/install-packages-from-source.sh @@ -4,5 +4,6 @@ ./tools/scripts/lib_install/libarchive.sh 3.5.1 ./tools/scripts/lib_install/lz4.sh 1.8.2 ./tools/scripts/lib_install/mariadb-connector-c.sh 3.2.3 +./tools/scripts/lib_install/msgpack.sh 4.1.1 ./tools/scripts/lib_install/spdlog.sh 1.9.2 ./tools/scripts/lib_install/zstandard.sh 1.4.9 diff --git a/components/core/tools/docker-images/clp-env-base-centos7.4/setup-scripts/install-packages-from-source.sh b/components/core/tools/docker-images/clp-env-base-centos7.4/setup-scripts/install-packages-from-source.sh index f19eba88a..fcc48a946 100755 --- a/components/core/tools/docker-images/clp-env-base-centos7.4/setup-scripts/install-packages-from-source.sh +++ b/components/core/tools/docker-images/clp-env-base-centos7.4/setup-scripts/install-packages-from-source.sh @@ -11,5 +11,6 @@ source /opt/rh/devtoolset-7/enable ./tools/scripts/lib_install/libarchive.sh 3.5.1 ./tools/scripts/lib_install/lz4.sh 1.8.2 ./tools/scripts/lib_install/mariadb-connector-c.sh 3.2.3 +./tools/scripts/lib_install/msgpack.sh 4.1.1 ./tools/scripts/lib_install/spdlog.sh 1.9.2 ./tools/scripts/lib_install/zstandard.sh 1.4.9 diff --git a/components/core/tools/docker-images/clp-env-base-focal/setup-scripts/install-packages-from-source.sh b/components/core/tools/docker-images/clp-env-base-focal/setup-scripts/install-packages-from-source.sh index 3b5bd2b8b..d00e436a7 100755 --- a/components/core/tools/docker-images/clp-env-base-focal/setup-scripts/install-packages-from-source.sh +++ b/components/core/tools/docker-images/clp-env-base-focal/setup-scripts/install-packages-from-source.sh @@ -4,5 +4,6 @@ ./tools/scripts/lib_install/libarchive.sh 3.5.1 ./tools/scripts/lib_install/lz4.sh 1.8.2 ./tools/scripts/lib_install/mariadb-connector-c.sh 3.2.3 +./tools/scripts/lib_install/msgpack.sh 4.1.1 ./tools/scripts/lib_install/spdlog.sh 1.9.2 ./tools/scripts/lib_install/zstandard.sh 1.4.9 diff --git a/components/core/tools/scripts/lib_install/msgpack.sh b/components/core/tools/scripts/lib_install/msgpack.sh new file mode 100755 index 000000000..eb62101cb --- /dev/null +++ b/components/core/tools/scripts/lib_install/msgpack.sh @@ -0,0 +1,76 @@ +#!/bin/bash + +# Dependencies: +# - Boost +# NOTE: Dependencies should be installed outside the script to allow the script to be largely distro-agnostic + +# Exit on any error +set -e + +cUsage="Usage: ${BASH_SOURCE[0]} [ <.deb output directory>]" +if [ "$#" -lt 1 ] ; then + echo $cUsage + exit +fi +version=$1 + +package_name=msgpackc-cxx +temp_dir=/tmp/${package_name}-installation +deb_output_dir=${temp_dir} +if [[ "$#" -gt 1 ]] ; then + deb_output_dir="$(readlink -f "$2")" + if [ ! -d ${deb_output_dir} ] ; then + echo "${deb_output_dir} does not exist or is not a directory" + exit + fi +fi + +# Check if already installed +set +e +dpkg -l ${package_name} +installed=$? +set -e +if [ $installed -eq 0 ] ; then + # Nothing to do + exit +fi + +echo "Checking for elevated privileges..." +privileged_command_prefix="" +if [ ${EUID:-$(id -u)} -ne 0 ] ; then + sudo echo "Script can elevate privileges." + privileged_command_prefix="${privileged_command_prefix} sudo" +fi + +# Download +mkdir -p $temp_dir +cd $temp_dir +extracted_dir=${temp_dir}/msgpack-cxx-${version} +if [ ! -e ${extracted_dir} ] ; then + tar_filename=msgpack-cxx-${version}.tar.gz + if [ ! -e ${tar_filename} ] ; then + curl -fsSL https://github.com/msgpack/msgpack-c/releases/download/cpp-${version}/${tar_filename} -o ${tar_filename} + fi + + tar -xf ${tar_filename} +fi + +# Set up +cd ${extracted_dir} +cmake . + +# Check if checkinstall is installed +set +e +command -v checkinstall +checkinstall_installed=$? +set -e + +# Install +install_command_prefix="${privileged_command_prefix}" +if [ $checkinstall_installed -eq 0 ] ; then + install_command_prefix="${install_command_prefix} checkinstall --pkgname '${package_name}' --pkgversion '${version}' --provides '${package_name}' --nodoc -y --pakdir \"${deb_output_dir}\"" +fi +${install_command_prefix} cmake --build . --target install + +# Clean up +rm -rf $temp_dir diff --git a/components/job-orchestration/job_orchestration/executor/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/celeryconfig.py index ae7183a8c..4e8c2b47d 100644 --- a/components/job-orchestration/job_orchestration/executor/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/celeryconfig.py @@ -1,15 +1,21 @@ import os -from job_orchestration.scheduler.scheduler_data import QueueName +from job_orchestration.scheduler.scheduler_data import QueueName, TASK_QUEUE_HIGHEST_PRIORITY # Worker settings # Force workers to consume only one task at a time worker_prefetch_multiplier = 1 -imports = ['job_orchestration.executor.compression_task'] +imports = [ + 'job_orchestration.executor.compression_task', + 'job_orchestration.executor.search_task' +] # Queue settings -task_queue_max_priority = 3 -task_routes = {'job_orchestration.executor.compression_task.compress': QueueName.COMPRESSION} +task_queue_max_priority = TASK_QUEUE_HIGHEST_PRIORITY +task_routes = { + 'job_orchestration.executor.compression_task.compress': QueueName.COMPRESSION, + 'job_orchestration.executor.search_task.search': QueueName.SEARCH +} task_create_missing_queues = True # Results backend settings diff --git a/components/job-orchestration/job_orchestration/executor/compression_task.py b/components/job-orchestration/job_orchestration/executor/compression_task.py index 63db92e17..1462b20bd 100644 --- a/components/job-orchestration/job_orchestration/executor/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compression_task.py @@ -11,9 +11,10 @@ from job_orchestration.job_config import ClpIoConfig, PathsToCompress from job_orchestration.scheduler.scheduler_data import \ TaskStatus, \ + TaskUpdateType, \ TaskUpdate, \ - TaskCompletionUpdate, \ - TaskFailureUpdate + TaskFailureUpdate, \ + CompressionTaskSuccessUpdate # Setup logging logger = get_task_logger(__name__) @@ -149,6 +150,7 @@ def compress(job_id: int, task_id: int, clp_io_config_json: str, paths_to_compre paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json) task_update = TaskUpdate( + type=TaskUpdateType.COMPRESSION, job_id=job_id, task_id=task_id, status=TaskStatus.IN_PROGRESS @@ -162,7 +164,8 @@ def compress(job_id: int, task_id: int, clp_io_config_json: str, paths_to_compre database_connection_params) if compression_successful: - task_update = TaskCompletionUpdate( + task_update = CompressionTaskSuccessUpdate( + type=TaskUpdateType.COMPRESSION, job_id=job_id, task_id=task_id, status=TaskStatus.SUCCEEDED, @@ -171,6 +174,7 @@ def compress(job_id: int, task_id: int, clp_io_config_json: str, paths_to_compre ) else: task_update = TaskFailureUpdate( + type=TaskUpdateType.COMPRESSION, job_id=job_id, task_id=task_id, status=TaskStatus.FAILED, diff --git a/components/job-orchestration/job_orchestration/executor/search_task.py b/components/job-orchestration/job_orchestration/executor/search_task.py new file mode 100644 index 000000000..e6a367b2a --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/search_task.py @@ -0,0 +1,108 @@ +import os +import pathlib +import subprocess + +from celery.utils.log import get_task_logger + +from job_orchestration.job_config import SearchConfig +from job_orchestration.executor.celery import app +from job_orchestration.executor.utils import append_message_to_task_results_queue +from job_orchestration.scheduler.scheduler_data import TaskUpdate, TaskUpdateType, TaskStatus, \ + TaskFailureUpdate + +# Setup logging +logger = get_task_logger(__name__) + + +def run_clo(job_id: int, task_id: int, clp_home: pathlib.Path, archive_output_dir: pathlib.Path, logs_dir: pathlib.Path, + search_controller_host: str, search_controller_port: int, archive_id: str, wildcard_query: str, + path_filter: str): + """ + Searches the given archive for the given wildcard query + + :param job_id: + :param task_id: + :param clp_home: + :param archive_output_dir: + :param logs_dir: + :param search_controller_host: + :param search_controller_port: + :param archive_id: + :param wildcard_query: + :param path_filter: + :return: tuple -- (whether the search was successful, output messages) + """ + # Assemble search command + cmd = [ + str(clp_home / 'bin' / 'clo'), + search_controller_host, + str(search_controller_port), + str(archive_output_dir / archive_id), + wildcard_query + ] + if path_filter is not None: + cmd.append(path_filter) + + # Open stderr log file + stderr_filename = f'job-{job_id}-task-{task_id}-stderr.log' + stderr_log_path = logs_dir / stderr_filename + stderr_log_file = open(stderr_log_path, 'w') + + # Start search + logger.debug("Searching started...") + search_successful = False + proc = subprocess.Popen(cmd, stderr=stderr_log_file) + + # Wait for search to finish + return_code = proc.wait() + if 0 != return_code: + logger.error(f'Failed to search, return_code={str(return_code)}') + else: + search_successful = True + logger.debug("Search complete.") + + # Close stderr log file + stderr_log_file.close() + + if search_successful: + return search_successful, None + else: + return search_successful, f"See {stderr_filename} in logs directory." + + +@app.task() +def search(job_id: int, task_id: int, search_config_json: str, archive_id: str): + clp_home = os.getenv('CLP_HOME') + archive_output_dir = os.getenv('CLP_ARCHIVE_OUTPUT_DIR') + logs_dir = os.getenv('CLP_LOGS_DIR') + celery_broker_url = os.getenv('BROKER_URL') + + search_config = SearchConfig.parse_raw(search_config_json) + + task_update = TaskUpdate( + type=TaskUpdateType.SEARCH, + job_id=job_id, + task_id=task_id, + status=TaskStatus.IN_PROGRESS + ) + append_message_to_task_results_queue(celery_broker_url, True, task_update.dict()) + logger.info(f"[job_id={job_id} task_id={task_id}] Search started.") + + search_successful, worker_output = run_clo(job_id, task_id, pathlib.Path(clp_home), + pathlib.Path(archive_output_dir), pathlib.Path(logs_dir), + search_config.search_controller_host, + search_config.search_controller_port, archive_id, + search_config.wildcard_query, search_config.path_filter) + + if search_successful: + task_update.status = TaskStatus.SUCCEEDED + else: + task_update = TaskFailureUpdate( + type=TaskUpdateType.SEARCH, + job_id=job_id, + task_id=task_id, + status=TaskStatus.FAILED, + error_message=worker_output + ) + append_message_to_task_results_queue(celery_broker_url, False, task_update.dict()) + logger.info(f"[job_id={job_id} task_id={task_id}] Search complete.") diff --git a/components/job-orchestration/job_orchestration/job_config.py b/components/job-orchestration/job_orchestration/job_config.py index 0d987a5f6..0df1ab00a 100644 --- a/components/job-orchestration/job_orchestration/job_config.py +++ b/components/job-orchestration/job_orchestration/job_config.py @@ -25,3 +25,10 @@ class OutputConfig(BaseModel): class ClpIoConfig(BaseModel): input: InputConfig output: OutputConfig + + +class SearchConfig(BaseModel): + search_controller_host: str + search_controller_port: int + wildcard_query: str + path_filter: str = None diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler.py b/components/job-orchestration/job_orchestration/scheduler/scheduler.py index 40ee45ce9..09be3d234 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler.py @@ -16,15 +16,20 @@ from clp_py_utils.core import read_yaml_config_file from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compression_task import compress +from job_orchestration.executor.search_task import search from job_orchestration.scheduler.results_consumer import ReconnectingResultsConsumer from job_orchestration.scheduler.scheduler_data import \ - Job, \ + CompressionJob, \ + SearchJob, \ JobStatus, \ - Task, \ + CompressionTask, \ + SearchTask, \ TaskStatus, \ + TaskUpdateType, \ TaskUpdate, \ - TaskCompletionUpdate, \ - TaskFailureUpdate, QueueName + TaskFailureUpdate, \ + CompressionTaskSuccessUpdate, \ + QueueName # Setup logging # Create logger @@ -36,10 +41,11 @@ logger.setLevel(logging.DEBUG) scheduled_jobs = {} +id_to_search_job: typing.Dict[int, SearchJob] = {} jobs_lock = threading.Lock() -def fetch_new_task_metadata(db_cursor) -> list: +def fetch_new_compression_task_metadata(db_cursor) -> list: db_cursor.execute(f""" SELECT compression_jobs.id as job_id, compression_jobs.status as job_status, @@ -56,59 +62,147 @@ def fetch_new_task_metadata(db_cursor) -> list: return db_cursor.fetchall() -def update_task_metadata(db_cursor, task_id, kv: typing.Dict[str, typing.Any]): +def fetch_new_search_task_metadata(db_cursor) -> list: + db_cursor.execute(f""" + SELECT search_jobs.id as job_id, + search_jobs.status as job_status, + search_jobs.num_tasks, + search_jobs.num_tasks_completed, + search_jobs.search_config, + search_tasks.id as task_id, + search_tasks.status as task_status, + search_tasks.archive_id + FROM search_jobs INNER JOIN search_tasks + ON search_jobs.id=search_tasks.job_id + WHERE search_tasks.status='{TaskStatus.SUBMITTED}' + """) + return db_cursor.fetchall() + + +def update_task_metadata(db_cursor, task_type: str, task_id, kv: typing.Dict[str, typing.Any]): if not len(kv): logger.error("Must specify at least one field to update") raise ValueError field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] - query = f'UPDATE compression_tasks SET {", ".join(field_set_expressions)} WHERE id={task_id};' + query = f'UPDATE {task_type}_tasks SET {", ".join(field_set_expressions)} WHERE id={task_id};' db_cursor.execute(query) -def update_job_metadata(db_cursor, job_id, kv): +def update_compression_task_metadata(db_cursor, task_id, kv: typing.Dict[str, typing.Any]): + update_task_metadata(db_cursor, 'compression', task_id, kv) + + +def update_search_task_metadata(db_cursor, task_id, kv: typing.Dict[str, typing.Any]): + update_task_metadata(db_cursor, 'search', task_id, kv) + + +def update_job_metadata(db_cursor, job_type: str, job_id, kv): if not len(kv): logger.error("Must specify at least one field to update") raise ValueError field_set_expressions = [f'{k}="{v}"' for k, v in kv.items()] - query = f'UPDATE compression_jobs SET {", ".join(field_set_expressions)} WHERE id={job_id};' + query = f'UPDATE {job_type}_jobs SET {", ".join(field_set_expressions)} WHERE id={job_id};' db_cursor.execute(query) -def increment_job_metadata(db_cursor, job_id, kv): +def update_compression_job_metadata(db_cursor, job_id, kv): + update_job_metadata(db_cursor, 'compression', job_id, kv) + + +def update_search_job_metadata(db_cursor, job_id, kv): + update_job_metadata(db_cursor, 'search', job_id, kv) + + +def increment_job_metadata(db_cursor, job_type: str, job_id, kv): if not len(kv): logger.error("Must specify at least one field to increment") raise ValueError field_set_expressions = [f'{k}={k}+{v}' for k, v in kv.items()] - query = f'UPDATE compression_jobs SET {", ".join(field_set_expressions)} WHERE id={job_id};' + query = f'UPDATE {job_type}_jobs SET {", ".join(field_set_expressions)} WHERE id={job_id};' db_cursor.execute(query) -def schedule_task(job: Job, task: Task, database_config: Database, dctx: zstandard.ZstdDecompressor = None): +def increment_compression_job_metadata(db_cursor, job_id, kv): + increment_job_metadata(db_cursor, 'compression', job_id, kv) + + +def increment_search_job_metadata(db_cursor, job_id, kv): + increment_job_metadata(db_cursor, 'search', job_id, kv) + + +def schedule_compression_task(job: CompressionJob, task: CompressionTask, database_config: Database, + dctx: zstandard.ZstdDecompressor = None): args = (job.id, task.id, job.get_clp_config_json(dctx), task.get_clp_paths_to_compress_json(dctx), database_config.get_clp_connection_params_and_type(True)) return compress.apply_async(args, task_id=str(task.id), queue=QueueName.COMPRESSION, priority=task.priority) +def schedule_search_task(job: SearchJob, task: SearchTask, dctx: zstandard.ZstdDecompressor): + args = (job.id, task.id, job.get_search_config_json_str(dctx), task.archive_id) + return search.apply_async(args, task_id=str(task.id), queue=QueueName.SEARCH, priority=task.priority) + + def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database): """ For all task with SUBMITTED status, push them to task queue to be processed, if finished, update them """ global scheduled_jobs + global id_to_search_job global jobs_lock logger.debug('Search and schedule new tasks') dctx = zstandard.ZstdDecompressor() - # Fetch new task - for task_row in fetch_new_task_metadata(db_cursor): + # Poll for new search tasks + for task_row in fetch_new_search_task_metadata(db_cursor): + search_task = SearchTask( + id=task_row['task_id'], + status=task_row['task_status'], + job_id=task_row['job_id'], + archive_id=task_row['archive_id'], + ) + job_id = search_task.job_id + + with jobs_lock: + now = datetime.datetime.utcnow() + + try: + search_job = id_to_search_job[job_id] + except KeyError: + # New job + search_job = SearchJob( + id=task_row['job_id'], + status=task_row['job_status'], + start_time=now, + num_tasks=task_row['num_tasks'], + num_tasks_completed=task_row['num_tasks_completed'], + search_config=task_row['search_config'], + ) + update_search_job_metadata(db_cursor, job_id, dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S'))) + id_to_search_job[search_job.id] = search_job + + celery_task = schedule_search_task(search_job, search_task, dctx) + + update_search_task_metadata(db_cursor, search_task.id, dict( + status=TaskStatus.SCHEDULED, + scheduled_time=now.strftime('%Y-%m-%d %H:%M:%S') + )) + db_conn.commit() + + search_task.instance = celery_task + search_task.status = TaskStatus.SCHEDULED + search_job.tasks[search_task.id] = search_task + + # Poll for new compression tasks + for task_row in fetch_new_compression_task_metadata(db_cursor): logger.debug(f"Found task with job_id={task_row['job_id']} task_id={task_row['task_id']}") # Only Add database credentials to ephemeral task specification passed to workers - task = Task( + task = CompressionTask( id=task_row['task_id'], status=task_row['task_status'], clp_paths_to_compress=task_row['clp_paths_to_compress'] @@ -122,7 +216,7 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database) job = scheduled_jobs[job_id] except KeyError: # Identified a new job identified - job = Job( + job = CompressionJob( id=task_row['job_id'], status=task_row['job_status'], start_time=now, @@ -130,14 +224,12 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database) num_tasks=task_row['num_tasks'], num_tasks_completed=task_row['num_tasks_completed'] ) - update_job_metadata(db_cursor, job_id, dict( - start_time=now.strftime('%Y-%m-%d %H:%M:%S') - )) + update_compression_job_metadata(db_cursor, job_id, dict(start_time=now.strftime('%Y-%m-%d %H:%M:%S'))) # Schedule task, update ephemeral metadata in scheduler and commit to database - celery_task_instance = schedule_task(job, task, database_config, dctx) + celery_task_instance = schedule_compression_task(job, task, database_config, dctx) - update_task_metadata(db_cursor, task.id, dict( + update_compression_task_metadata(db_cursor, task.id, dict( status=TaskStatus.SCHEDULED, scheduled_time=now.strftime('%Y-%m-%d %H:%M:%S') )) @@ -165,32 +257,170 @@ def search_and_schedule_new_tasks(db_conn, db_cursor, database_config: Database) db_conn.commit() -def update_completed_jobs(db_conn, db_cursor): +def update_completed_jobs(db_cursor, job_type: str): # Update completed jobs if there are any db_cursor.execute(f""" - UPDATE compression_jobs - SET status='{JobStatus.COMPLETED}', duration=TIMESTAMPDIFF(SECOND, start_time, CURRENT_TIMESTAMP()) + UPDATE {job_type}_jobs + SET status='{JobStatus.SUCCEEDED}', duration=TIMESTAMPDIFF(SECOND, start_time, CURRENT_TIMESTAMP()) WHERE status='{JobStatus.SCHEDULED}' AND num_tasks=num_tasks_completed """) + + +def handle_compression_task_update(db_conn, db_cursor, task_update: typing.Union[TaskUpdate, + CompressionTaskSuccessUpdate, + TaskFailureUpdate]): + # Retrieve scheduler state + try: + job = scheduled_jobs[task_update.job_id] + task = job.tasks[task_update.task_id] + except KeyError: + # Scheduler detected response from task which it does not keep track of + # It could be that previous scheduler crashed. + # The only thing we can do is to log, and discard the message + # to prevent infinite loop + logger.warning(f"Discarding update for untracked task: {task_update.json()}") + return + + # Process task update and update database + # Scheduler is aware of the task + now = datetime.datetime.utcnow() + task_duration = 0 + if task.start_time: + task_duration = max(int((now - task.start_time).total_seconds()), 1) + + if TaskStatus.IN_PROGRESS == task_update.status: + # Update sent by worker when task began in the database + update_compression_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + start_time=now.strftime('%Y-%m-%d %H:%M:%S') + )) + elif TaskStatus.SUCCEEDED == task_update.status: + # Update sent by worker when task finishes + if TaskStatus.IN_PROGRESS != task.status: + logger.warning(f"Discarding task update that's impossible for tracked task: {task_update.json()}") + return + + logger.info(f"Compression task job-{task_update.job_id}-task-{task_update.task_id} completed in " + f"{task_duration} second(s).") + + update_compression_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + partition_uncompressed_size=task_update.total_uncompressed_size, + partition_compressed_size=task_update.total_compressed_size, + duration=task_duration + )) + increment_compression_job_metadata(db_cursor, task_update.job_id, dict( + uncompressed_size=task_update.total_uncompressed_size, + compressed_size=task_update.total_compressed_size, + num_tasks_completed=1 + )) + elif TaskStatus.FAILED == task_update.status: + logger.error(f"Compression task job-{task_update.job_id}-task-{task_update.task_id} failed with error: " + f"{task_update.error_message}.") + update_compression_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + duration=task_duration + )) + update_compression_job_metadata(db_cursor, job.id, dict( + status=task_update.status, + status_msg=task_update.error_message + )) + else: + raise NotImplementedError + db_conn.commit() + # Only update scheduler metadata only after transaction finishes + # If update fails, rollback and avoid updating scheduler state + task.status = task_update.status + if TaskStatus.IN_PROGRESS == task_update.status: + task.start_time = now + elif TaskStatus.SUCCEEDED == task_update.status: + job.num_tasks_completed += 1 + elif TaskStatus.FAILED == task_update.status: + # TODO: how to handle failure scheduler state update besides simply recording acknowledgement? + job.status = task_update.status + else: + raise NotImplementedError + -def task_results_consumer(sql_adapter: SQL_Adapter, celery_broker_url): - global scheduled_jobs - global jobs_lock +def handle_search_task_update(db_conn, db_cursor, task_update: typing.Union[TaskUpdate, TaskFailureUpdate]): + global id_to_search_job + try: + job = id_to_search_job[task_update.job_id] + task = job.tasks[task_update.task_id] + except KeyError: + logger.warning(f"Discarding update for untracked task: {task_update.json()}") + return + + now = datetime.datetime.utcnow() + task_duration = 0 + if task.start_time: + task_duration = max(int((now - task.start_time).total_seconds()), 1) + + if TaskStatus.IN_PROGRESS == task_update.status: + update_search_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + start_time=now.strftime('%Y-%m-%d %H:%M:%S') + )) + elif TaskStatus.SUCCEEDED == task_update.status: + if TaskStatus.IN_PROGRESS != task.status: + logger.warning(f"Discarding task update that's impossible for tracked task: {task_update.json()}") + return + + logger.info(f"Search task job-{task_update.job_id}-task-{task_update.task_id} completed in {task_duration} " + f"second(s).") + + update_search_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + duration=task_duration + )) + increment_search_job_metadata(db_cursor, task_update.job_id, dict( + num_tasks_completed=1 + )) + elif TaskStatus.FAILED == task_update.status: + logger.error(f"Search task job-{task_update.job_id}-task-{task_update.task_id} failed with error: " + f"{task_update.error_message}.") + update_search_task_metadata(db_cursor, task_update.task_id, dict( + status=task_update.status, + duration=task_duration + )) + update_search_job_metadata(db_cursor, job.id, dict( + status=task_update.status, + status_msg=task_update.error_message + )) + else: + raise NotImplementedError + + db_conn.commit() + + task.status = task_update.status + if TaskStatus.IN_PROGRESS == task_update.status: + task.start_time = now + elif TaskStatus.SUCCEEDED == task_update.status: + job.num_tasks_completed += 1 + elif TaskStatus.FAILED == task_update.status: + job.status = task_update.status + else: + raise NotImplementedError + + +def task_results_consumer(sql_adapter: SQL_Adapter, celery_broker_url): def callback(ch, method, properties, body): global scheduled_jobs + global id_to_search_job global jobs_lock global logger try: # Validate message body task_update = TaskUpdate.parse_raw(body) - if TaskStatus.SUCCEEDED == task_update.status: - task_update = TaskCompletionUpdate.parse_raw(body) - elif TaskStatus.FAILED == task_update.status: + if TaskStatus.FAILED == task_update.status: task_update = TaskFailureUpdate.parse_raw(body) + elif TaskUpdateType.COMPRESSION == task_update.type and\ + TaskStatus.SUCCEEDED == task_update.status: + task_update = CompressionTaskSuccessUpdate.parse_raw(body) except ValidationError as err: logger.error(err) exit(-1) @@ -202,80 +432,11 @@ def callback(ch, method, properties, body): f"task_id={task_update.task_id} " f"status={task_update.status}") - # Retrieve scheduler state - try: - job = scheduled_jobs[task_update.job_id] - task = job.tasks[task_update.task_id] - except KeyError: - # Scheduler detected response from task which it does not keep track of - # It could be that previous scheduler crashed. - # The only thing we can do is to log, and discard the message - # to prevent infinite loop - logger.warning(f"Discarding update for untracked task: {task_update.json()}") - ch.basic_ack(method.delivery_tag) - return - - # Process task update and update database try: - # Scheduler is aware of the task - now = datetime.datetime.utcnow() - task_duration = 0 - if task.start_time: - task_duration = max(int((now - task.start_time).total_seconds()), 1) - - if TaskStatus.IN_PROGRESS == task_update.status: - # Update sent by worker when task began in the database - update_task_metadata(db_cursor, task_update.task_id, dict( - status=task_update.status, - start_time=now.strftime('%Y-%m-%d %H:%M:%S') - )) - elif TaskStatus.SUCCEEDED == task_update.status: - # Update sent by worker when task finishes - if TaskStatus.IN_PROGRESS != task.status: - logger.warning(f"Discarding task update that's impossible for tracked task: {task_update.json()}") - ch.basic_ack(method.delivery_tag) - raise NotImplementedError - - logger.info(f"Task job-{task_update.job_id}-task-{task_update.task_id} completed in {task_duration} " - f"second(s).") - - update_task_metadata(db_cursor, task_update.task_id, dict( - status=task_update.status, - partition_uncompressed_size=task_update.total_uncompressed_size, - partition_compressed_size=task_update.total_compressed_size, - duration=task_duration - )) - increment_job_metadata(db_cursor, task_update.job_id, dict( - uncompressed_size=task_update.total_uncompressed_size, - compressed_size=task_update.total_compressed_size, - num_tasks_completed=1 - )) - elif TaskStatus.FAILED == task_update.status: - logger.error(f"Task job-{task_update.job_id}-task-{task_update.task_id} failed with error: " - f"{task_update.error_message}.") - update_task_metadata(db_cursor, task_update.task_id, dict( - status=task_update.status, - duration=task_duration - )) - update_job_metadata(db_cursor, job.id, dict( - status=task_update.status, - status_msg=task_update.error_message - )) - else: - raise NotImplementedError - - db_conn.commit() - - # Only update scheduler metadata only after transaction finishes - # If update fails, rollback and avoid updating scheduler state - task.status = task_update.status - if TaskStatus.IN_PROGRESS == task_update.status: - task.start_time = now - elif TaskStatus.SUCCEEDED == task_update.status: - job.num_tasks_completed += 1 - elif TaskStatus.FAILED == task_update.status: - # TODO: how to handle failure scheduler state update besides simply recording acknowledgement? - job.status = task_update.status + if TaskUpdateType.COMPRESSION == task_update.type: + handle_compression_task_update(db_conn, db_cursor, task_update) + elif TaskUpdateType.SEARCH == task_update.type: + handle_search_task_update(db_conn, db_cursor, task_update) else: raise NotImplementedError @@ -322,7 +483,9 @@ def main(argv): with closing(sql_adapter.create_connection(True)) as db_conn, \ closing(db_conn.cursor(dictionary=True)) as db_cursor: search_and_schedule_new_tasks(db_conn, db_cursor, sql_adapter.database_config) - update_completed_jobs(db_conn, db_cursor) + update_completed_jobs(db_cursor, 'compression') + update_completed_jobs(db_cursor, 'search') + db_conn.commit() except: logger.exception("Error in scheduling.") finally: diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index 6c54fcc49..8a416a73a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -8,9 +8,18 @@ from celery.result import AsyncResult from pydantic import BaseModel, validator +TASK_QUEUE_LOWEST_PRIORITY = 1 +TASK_QUEUE_HIGHEST_PRIORITY = 3 + class QueueName: COMPRESSION = "compression" + SEARCH = "search" + + +class TaskUpdateType: + COMPRESSION = 'COMPRESSION' + SEARCH = 'SEARCH' class TaskStatus: @@ -22,10 +31,18 @@ class TaskStatus: class TaskUpdate(BaseModel): + type: str job_id: int task_id: int status: str + @validator('type') + def validate_type(cls, field): + supported_types = [TaskUpdateType.COMPRESSION, TaskUpdateType.SEARCH] + if field not in supported_types: + raise ValueError(f"Unsupported task update type: '{field}'") + return field + @validator('status') def valid_status(cls, field): supported_status = [TaskStatus.IN_PROGRESS, TaskStatus.SUCCEEDED, TaskStatus.FAILED] @@ -34,19 +51,19 @@ def valid_status(cls, field): return field -class TaskCompletionUpdate(TaskUpdate): - total_uncompressed_size: int - total_compressed_size: int - - class TaskFailureUpdate(TaskUpdate): error_message: str -class Task(BaseModel): +class CompressionTaskSuccessUpdate(TaskUpdate): + total_uncompressed_size: int + total_compressed_size: int + + +class CompressionTask(BaseModel): id: int status: str - priority: int = 1 + priority: int = TASK_QUEUE_LOWEST_PRIORITY clp_paths_to_compress: bytes start_time: datetime.datetime = None instance: AsyncResult = None @@ -60,23 +77,54 @@ def get_clp_paths_to_compress_json(self, dctx: zstandard.ZstdDecompressor = None return json.dumps(msgpack.unpackb(dctx.decompress(self.clp_paths_to_compress))) +class SearchTask(BaseModel): + id: int + status: str + priority: int = TASK_QUEUE_HIGHEST_PRIORITY + start_time: typing.Optional[datetime.datetime] = None + job_id: int + archive_id: str + cancelled: bool = False + instance: AsyncResult = None + + # This is necessary so we can store an AsyncResult in the task even though + # pydantic has no validator for it + class Config: + arbitrary_types_allowed = True + + class JobStatus: SCHEDULING = 'SCHEDULING' SCHEDULED = 'SCHEDULED' - COMPLETED = 'COMPLETED' + SUCCEEDED = 'SUCCEEDED' FAILED = 'FAILED' -class Job(BaseModel): +class CompressionJob(BaseModel): id: int status: str start_time: datetime.datetime clp_config: bytes num_tasks: typing.Optional[int] num_tasks_completed: int - tasks: Dict[int, Task] = {} + tasks: Dict[int, CompressionTask] = {} def get_clp_config_json(self, dctx: zstandard.ZstdDecompressor = None): if not dctx: dctx = zstandard.ZstdDecompressor() return json.dumps(msgpack.unpackb(dctx.decompress(self.clp_config))) + + +class SearchJob(BaseModel): + id: int + search_config: bytes + status: str + start_time: datetime.datetime + num_tasks: int + num_tasks_completed: int = 0 + tasks: Dict[int, SearchTask] = {} + + def get_search_config_json_str(self, dctx: zstandard.ZstdDecompressor = None): + if not dctx: + dctx = zstandard.ZstdDecompressor() + return json.dumps(msgpack.unpackb(dctx.decompress(self.search_config))) diff --git a/components/package-template/src/sbin/native/search b/components/package-template/src/sbin/native/search index 58c5e41a5..e9b2de1f8 100755 --- a/components/package-template/src/sbin/native/search +++ b/components/package-template/src/sbin/native/search @@ -1,11 +1,16 @@ #!/usr/bin/env python3 import argparse +import asyncio +import datetime import logging +import multiprocessing import os import pathlib -import subprocess +import socket import sys -import uuid +import time +from asyncio import StreamReader, StreamWriter +from contextlib import closing # Setup logging # Create logger @@ -55,8 +60,169 @@ clp_home = get_clp_home() if clp_home is None or not load_bundled_python_lib_path(clp_home): sys.exit(-1) -import yaml +import msgpack +import zstandard + from clp.package_utils import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, validate_and_load_config_file +from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, Database +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.job_config import SearchConfig +from job_orchestration.scheduler.scheduler_data import JobStatus + + +async def run_function_in_process(function, *args, initializer=None, init_args=None): + """ + Runs the given function in a separate process wrapped in a *cancellable* + asyncio task. This is necessary because asyncio's multiprocessing process + cannot be cancelled once it's started. + :param function: Method to run + :param args: Arguments for the method + :param initializer: Initializer for each process in the pool + :param init_args: Arguments for the initializer + :return: Return value of the method + """ + pool = multiprocessing.Pool(1, initializer, init_args) + + loop = asyncio.get_event_loop() + fut = loop.create_future() + + def process_done_callback(obj): + loop.call_soon_threadsafe(fut.set_result, obj) + + def process_error_callback(err): + loop.call_soon_threadsafe(fut.set_exception, err) + + pool.apply_async(function, args, callback=process_done_callback, error_callback=process_error_callback) + + try: + return await fut + except asyncio.CancelledError: + pass + finally: + pool.terminate() + pool.close() + + +def create_and_monitor_job_in_db(db_config: Database, wildcard_query: str, path_filter: str, + search_controller_host: str, search_controller_port: int): + search_config = SearchConfig( + search_controller_host=search_controller_host, + search_controller_port=search_controller_port, + wildcard_query=wildcard_query, + path_filter=path_filter + ) + + sql_adapter = SQL_Adapter(db_config) + zstd_cctx = zstandard.ZstdCompressor(level=3) + with closing(sql_adapter.create_connection(True)) as db_conn, closing(db_conn.cursor(dictionary=True)) as db_cursor: + # Create job + db_cursor.execute(f"INSERT INTO `search_jobs` (`search_config`) VALUES (%s)", + (zstd_cctx.compress(msgpack.packb(search_config.dict())),)) + db_conn.commit() + job_id = db_cursor.lastrowid + + # Create a task for each archive, in batches + next_pagination_id = 0 + pagination_limit = 64 + num_tasks_added = 0 + while True: + # Get next `limit` rows + db_cursor.execute( + f""" + SELECT `id` FROM {CLP_METADATA_TABLE_PREFIX}archives + WHERE `pagination_id` >= {next_pagination_id} + LIMIT {pagination_limit} + """ + ) + rows = db_cursor.fetchall() + + # Insert tasks + db_cursor.execute(f""" + INSERT INTO `search_tasks` (`job_id`, `archive_id`, `scheduled_time`) + VALUES ({"), (".join(f"{job_id}, '{row['id']}', '{datetime.datetime.utcnow()}'" for row in rows)}) + """) + db_conn.commit() + num_tasks_added += len(rows) + + if len(rows) < pagination_limit: + # Less than limit rows returned, so there are no more rows + break + next_pagination_id += pagination_limit + + # Mark job as scheduled + db_cursor.execute(f""" + UPDATE `search_jobs` + SET num_tasks={num_tasks_added}, status = '{JobStatus.SCHEDULED}' + WHERE id = {job_id} + """) + db_conn.commit() + + # Wait for the job to be marked complete + job_complete = False + while not job_complete: + db_cursor.execute(f"SELECT `status`, `status_msg` FROM `search_jobs` WHERE `id` = {job_id}") + # There will only ever be one row since it's impossible to have more than one job with the same ID + row = db_cursor.fetchall()[0] + if JobStatus.SUCCEEDED == row['status']: + job_complete = True + elif JobStatus.FAILED == row['status']: + logger.error(row['status_msg']) + job_complete = True + db_conn.commit() + + time.sleep(1) + + +async def worker_connection_handler(reader: StreamReader, writer: StreamWriter): + try: + unpacker = msgpack.Unpacker() + while True: + # Read some data from the worker and feed it to msgpack + buf = await reader.read(1024) + if b'' == buf: + # Worker closed + return + unpacker.feed(buf) + + # Print out any messages we can decode + for unpacked in unpacker: + print(f"{unpacked[0]}: {unpacked[2]}", end='') + except asyncio.CancelledError: + return + finally: + writer.close() + + +async def do_search(db_config: Database, wildcard_query: str, path_filter: str, host: str): + # Start server to receive and print results + try: + server = await asyncio.start_server(client_connected_cb=worker_connection_handler, host=host, port=0, + family=socket.AF_INET) + except asyncio.CancelledError: + # Search cancelled + return + port = server.sockets[0].getsockname()[1] + + server_task = asyncio.ensure_future(server.serve_forever()) + + db_monitor_task = asyncio.ensure_future( + run_function_in_process(create_and_monitor_job_in_db, db_config, wildcard_query, path_filter, host, port)) + + # Wait for the job to complete or an error to occur + pending = [server_task, db_monitor_task] + try: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + if db_monitor_task in done: + server.close() + await server.wait_closed() + else: + logger.error("server task unexpectedly returned") + db_monitor_task.cancel() + await db_monitor_task + except asyncio.CancelledError: + server.close() + await server.wait_closed() + await db_monitor_task def main(argv): @@ -77,30 +243,18 @@ def main(argv): logger.exception("Failed to load config.") return -1 - # Generate database config file for clp - db_config_file_path = clp_config.logs_directory / f'decompress-db-config-{uuid.uuid4()}.yml' - db_config_file = open(db_config_file_path, 'w') - yaml.safe_dump(clp_config.database.get_clp_connection_params_and_type(True), db_config_file) - db_config_file.close() - - cmd = [ - str(clp_home / 'bin' / 'clg'), - str(clp_config.archive_output.directory), parsed_args.wildcard_query, - '--db-config-file', str(db_config_file_path), - ] - if parsed_args.file_path is not None: - cmd.append(parsed_args.file_path) - - proc = subprocess.Popen(cmd) - return_code = proc.wait() - if 0 != return_code: - logger.error(f"Search failed, return_code={return_code}") - return return_code - - # Remove generated files - db_config_file_path.unlink() - - return return_code + # Get IP of local machine + host_ip = None + for ip in set(socket.gethostbyname_ex(socket.gethostname())[2]): + host_ip = ip + break + if host_ip is None: + logger.error("Could not determine IP of local machine.") + return -1 + + asyncio.run(do_search(clp_config.database, parsed_args.wildcard_query, parsed_args.file_path, host_ip)) + + return 0 if '__main__' == __name__: diff --git a/components/package-template/src/sbin/search b/components/package-template/src/sbin/search index 3d8b303b1..973b954ba 100755 --- a/components/package-template/src/sbin/search +++ b/components/package-template/src/sbin/search @@ -10,7 +10,7 @@ import uuid # Setup logging # Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.INFO) # Setup console logging logging_console_handler = logging.StreamHandler() logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") diff --git a/components/package-template/src/sbin/start-clp b/components/package-template/src/sbin/start-clp index d189dd300..b46b6dead 100755 --- a/components/package-template/src/sbin/start-clp +++ b/components/package-template/src/sbin/start-clp @@ -376,7 +376,7 @@ def start_worker(instance_id: str, clp_config: CLPConfig, container_clp_config: 'worker', '--concurrency', str(num_cpus), '--loglevel', 'WARNING', - '-Q', f'{QueueName.COMPRESSION}', + '-Q', f"{QueueName.COMPRESSION},{QueueName.SEARCH}", ] cmd = container_start_cmd + worker_cmd subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) diff --git a/tools/packager/install-scripts/install-core.sh b/tools/packager/install-scripts/install-core.sh index 2c8256cce..2650769cc 100755 --- a/tools/packager/install-scripts/install-core.sh +++ b/tools/packager/install-scripts/install-core.sh @@ -7,7 +7,7 @@ build_dir=/tmp/core-build mkdir ${build_dir} cd ${build_dir} -exes="clp clg" +exes="clg clp clo" cmake ${WORKING_DIR}/core make -j${BUILD_PARALLELISM} ${exes}