Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic implementation of distributed search following the conventions of distributed compression using Celery. #68

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@ def main(argv):
) ROW_FORMAT=DYNAMIC
""")

scheduling_db_cursor.execute(f"""
CREATE TABLE IF NOT EXISTS `search_jobs` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` VARCHAR(16) NOT NULL DEFAULT '{JobStatus.SCHEDULING}',
`status_msg` VARCHAR(255) NOT NULL DEFAULT '',
`creation_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`start_time` DATETIME NULL DEFAULT NULL,
`duration` INT NULL DEFAULT NULL,
`num_tasks` INT NOT NULL DEFAULT '0',
`num_tasks_completed` INT NOT NULL DEFAULT '0',
`clp_binary_version` INT NULL DEFAULT NULL,
`search_config` VARBINARY(60000) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `JOB_STATUS` (`status`) USING BTREE
) ROW_FORMAT=DYNAMIC
""")

scheduling_db_cursor.execute(f"""
CREATE TABLE IF NOT EXISTS `search_tasks` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`status` VARCHAR(16) NOT NULL DEFAULT '{TaskStatus.SUBMITTED}',
`scheduled_time` DATETIME NULL DEFAULT NULL,
`start_time` DATETIME NULL DEFAULT NULL,
`duration` SMALLINT NULL DEFAULT NULL,
`job_id` INT NOT NULL,
`archive_id` VARCHAR(64) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
INDEX `job_id` (`job_id`) USING BTREE,
INDEX `TASK_STATUS` (`status`) USING BTREE,
INDEX `TASK_START_TIME` (`start_time`) USING BTREE,
CONSTRAINT `search_tasks` FOREIGN KEY (`job_id`)
REFERENCES `search_jobs` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION
) ROW_FORMAT=DYNAMIC
""")

scheduling_db.commit()
except:
logger.exception("Failed to create scheduling tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def handle_job(scheduling_db, scheduling_db_cursor, clp_io_config: ClpIoConfig,

if JobStatus.SCHEDULED == job_status:
pass # Simply wait another iteration
elif JobStatus.COMPLETED == job_status:
elif JobStatus.SUCCEEDED == job_status:
# All tasks in the job is done
speed = 0
if not no_progress_reporting:
Expand Down
16 changes: 15 additions & 1 deletion components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ else()
message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for MariaDBClient")
endif()

# Find and setup msgpack
find_package(msgpack 4.1.1 REQUIRED)
if(msgpack_FOUND)
message(STATUS "Found msgpack ${msgpack_VERSION}")
else()
message(FATAL_ERROR "Could not find msgpack")
endif()

# Add yaml-cpp
add_subdirectory(submodules/yaml-cpp EXCLUDE_FROM_ALL)

Expand Down Expand Up @@ -489,14 +497,20 @@ set(SOURCE_FILES_clo
submodules/sqlite3/sqlite3.c
submodules/sqlite3/sqlite3.h
submodules/sqlite3/sqlite3ext.h
)
src/networking/socket_utils.cpp
src/networking/socket_utils.hpp
src/networking/SocketOperationFailed.cpp
src/networking/SocketOperationFailed.hpp
src/PThread.cpp src/PThread.hpp src/clo/ControllerMonitoringThread.cpp src/clo/ControllerMonitoringThread.hpp)
add_executable(clo ${SOURCE_FILES_clo})
target_link_libraries(clo
PRIVATE
Boost::filesystem Boost::iostreams Boost::program_options
fmt::fmt
msgpackc-cxx
spdlog::spdlog
${sqlite_LIBRARY_DEPENDENCIES}
Threads::Threads
ZStd::ZStd
)
target_compile_features(clo
Expand Down
1 change: 1 addition & 0 deletions components/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ so we've included some scripts to download, compile, and install them:
./tools/scripts/lib_install/libarchive.sh 3.5.1
./tools/scripts/lib_install/lz4.sh 1.8.2
./tools/scripts/lib_install/mariadb-connector-c.sh 3.2.3
./tools/scripts/lib_install/msgpack.sh 4.1.1
./tools/scripts/lib_install/spdlog.sh 1.9.2
./tools/scripts/lib_install/zstandard.sh 1.4.9
```
Expand Down
58 changes: 58 additions & 0 deletions components/core/src/PThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "PThread.hpp"

// spdlog
#include <spdlog/spdlog.h>

// Project headers
#include "Defs.h"

PThread::~PThread () {
if (m_thread_running) {
SPDLOG_WARN("PThread did not exit before being destroyed.");
int returned_value = pthread_cancel(m_thread);
if (0 != returned_value) {
SPDLOG_ERROR("Failed to cancel thread, errno={}", returned_value);
}
}
}

void PThread::start () {
int returned_value = pthread_create(&m_thread, nullptr, thread_entry_point, this);
if (0 != returned_value) {
SPDLOG_ERROR("Failed to start thread, errno={}", returned_value);
throw OperationFailed(ErrorCode_errno, __FILENAME__, __LINE__);
}
m_thread_running = true;
}

void PThread::detach () {
int returned_value = pthread_detach(m_thread);
if (0 != returned_value) {
throw OperationFailed(ErrorCode_errno, __FILENAME__, __LINE__);
}
}

ErrorCode PThread::try_join (void*& thread_return_value) {
auto return_value = pthread_join(m_thread, &thread_return_value);
if (0 != return_value) {
return ErrorCode_errno;
} else {
return ErrorCode_Success;
}
}

void* PThread::join () {
void* thread_return_value;
auto error_code = try_join(thread_return_value);
if (ErrorCode_Success != error_code) {
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}
return thread_return_value;
}

void* PThread::thread_entry_point (void* arg) {
auto thread = reinterpret_cast<PThread*>(arg);
auto return_value = thread->thread_method();
thread->mark_as_exited();
return return_value;
}
84 changes: 84 additions & 0 deletions components/core/src/PThread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#ifndef PTHREAD_HPP
#define PTHREAD_HPP

// C standard libraries
#include <pthread.h>

// C++ standard libraries
#include <atomic>

// Project headers
#include "ErrorCode.hpp"
#include "TraceableException.hpp"

/**
* C++ wrapper for POSIX threads
*/
class PThread {
public:
// Types
class OperationFailed : public TraceableException {
public:
// Constructors
OperationFailed (ErrorCode error_code, const char* const filename, int line_number) :
TraceableException(error_code, filename, line_number) {}

// Methods
const char* what () const noexcept override {
return "PThread operation failed";
}
};

// Constructors
PThread () : m_thread(0), m_thread_running(false) {};

// Destructor
virtual ~PThread ();

// Methods
/**
* Starts the thread
*/
void start ();
/**
* Detaches the thread so its resources are released without being joined
*/
void detach ();
/**
* Tries to join with the thread
* @param thread_return_value The thread's return value
* @return ErrorCode_errno on failure
* @return ErrorCode_Success otherwise
*/
ErrorCode try_join (void*& thread_return_value);
/**
* Joins with the thread
* @return The thread's return value
*/
void* join ();

bool is_running () const { return m_thread_running; }

protected:
// Methods
virtual void* thread_method () = 0;
/**
* Indicates that the thread has exited
*/
void mark_as_exited () { m_thread_running = false; }

private:
// Methods
/**
* Entry-point method for the thread
* @param arg
* @return Same as PThread::thread_method
*/
static void* thread_entry_point (void* arg);

// Variables
pthread_t m_thread;
std::atomic_bool m_thread_running;
};

#endif //PTHREAD_HPP
22 changes: 19 additions & 3 deletions components/core/src/clo/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,15 @@ namespace clo {
// Define hidden positional options (not shown in Boost's program options help message)
po::options_description hidden_positional_options;
hidden_positional_options.add_options()
("search-controller-host", po::value<string>(&m_search_controller_host))
("search-controller-port", po::value<string>(&m_search_controller_port))
("archive-path", po::value<string>(&m_archive_path))
("wildcard-string", po::value<string>(&m_search_string))
("file-path", po::value<string>(&m_file_path))
;
po::positional_options_description positional_options_description;
positional_options_description.add("search-controller-host", 1);
positional_options_description.add("search-controller-port", 1);
positional_options_description.add("archive-path", 1);
positional_options_description.add("wildcard-string", 1);
positional_options_description.add("file-path", 1);
Expand Down Expand Up @@ -117,8 +121,9 @@ namespace clo {
cerr << endl;

cerr << "Examples:" << endl;
cerr << R"( # Search ARCHIVE_PATH for " ERROR ")" << endl;
cerr << " " << get_program_name() << R"( ARCHIVE_PATH " ERROR ")" << endl;
cerr << R"( # Search ARCHIVE_PATH for " ERROR " and send results to the controller at localhost:5555)"
<< endl;
cerr << " " << get_program_name() << R"( localhost 5555 ARCHIVE_PATH " ERROR ")" << endl;
cerr << endl;

cerr << "Options can be specified on the command line or through a configuration file." << endl;
Expand All @@ -132,6 +137,16 @@ namespace clo {
return ParsingResult::InfoCommand;
}

// Validate search controller host was specified
if (m_search_controller_host.empty()) {
throw invalid_argument("SEARCH_CONTROLLER_HOST not specified or empty.");
}

// Validate search controller port was specified
if (m_search_controller_port.empty()) {
throw invalid_argument("SEARCH_CONTROLLER_PORT not specified or empty.");
}

// Validate archive path was specified
if (m_archive_path.empty()) {
throw invalid_argument("ARCHIVE_PATH not specified or empty.");
Expand Down Expand Up @@ -190,6 +205,7 @@ namespace clo {
}

void CommandLineArguments::print_basic_usage () const {
cerr << "Usage: " << get_program_name() << R"( [OPTIONS] ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl;
cerr << "Usage: " << get_program_name() << " [OPTIONS] SEARCH_CONTROLLER_HOST SEARCH_CONTROLLER_PORT "
<< R"(ARCHIVE_PATH "WILDCARD STRING" [FILE])" << endl;
}
}
4 changes: 4 additions & 0 deletions components/core/src/clo/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace clo {
// Methods
ParsingResult parse_arguments (int argc, const char* argv[]) override;

const std::string& get_search_controller_host () const { return m_search_controller_host; }
const std::string& get_search_controller_port () const { return m_search_controller_port; }
const std::string& get_archive_path () const { return m_archive_path; }
bool ignore_case () const { return m_ignore_case; }
const std::string& get_search_string () const { return m_search_string; }
Expand All @@ -34,6 +36,8 @@ namespace clo {
void print_basic_usage () const override;

// Variables
std::string m_search_controller_host;
std::string m_search_controller_port;
std::string m_archive_path;
bool m_ignore_case;
std::string m_search_string;
Expand Down
47 changes: 47 additions & 0 deletions components/core/src/clo/ControllerMonitoringThread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "ControllerMonitoringThread.hpp"

// C standard libraries
#include <unistd.h>

// spdlog
#include <spdlog/spdlog.h>

// Project headers
#include "../networking/socket_utils.hpp"

void* ControllerMonitoringThread::thread_method () {
// Wait for the controller socket to close
constexpr size_t cBufLen = 4096;
char buf[cBufLen];
size_t num_bytes_received;
for (bool exit = false; false == exit;) {
auto error_code = networking::try_receive(m_controller_socket_fd, buf, cBufLen, num_bytes_received);
switch (error_code) {
case ErrorCode_EndOfFile:
// Controller closed the connection
m_query_cancelled = true;
exit = true;
break;
case ErrorCode_Success:
// Unexpectedly received data
SPDLOG_ERROR("Unexpected received {} bytes of data from controller.", num_bytes_received);
break;
case ErrorCode_BadParam:
SPDLOG_ERROR("Bad parameter sent to try_receive.", num_bytes_received);
exit = true;
break;
case ErrorCode_errno:
SPDLOG_ERROR("Failed to receive data from controller, errno={}.", errno);
exit = true;
break;
default:
SPDLOG_ERROR("Unexpected error from try_receive, error_code={}.", error_code);
exit = true;
break;
}
}

close(m_controller_socket_fd);

return nullptr;
}
29 changes: 29 additions & 0 deletions components/core/src/clo/ControllerMonitoringThread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef CONTROLLERMONITORINGTHREAD_HPP
#define CONTROLLERMONITORINGTHREAD_HPP

// Project headers
#include "../PThread.hpp"

/**
* A thread that waits for the controller to close the connection at which time
* it will indicate the query has been cancelled.
*/
class ControllerMonitoringThread : public PThread {
public:
// Constructor
ControllerMonitoringThread (int controller_socket_fd) : m_controller_socket_fd(controller_socket_fd),
m_query_cancelled(false) {}

const std::atomic_bool& get_query_cancelled () const { return m_query_cancelled; }

protected:
// Methods
void* thread_method () override;

private:
// Variables
int m_controller_socket_fd;
std::atomic_bool m_query_cancelled;
};

#endif //CONTROLLERMONITORINGTHREAD_HPP
Loading