Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-s: Add archive_id to search results. #435

Merged
merged 21 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
#include "ArchiveReader.hpp"

#include <filesystem>
#include <string_view>

#include "archive_constants.hpp"
#include "ReaderUtils.hpp"

using std::string_view;

namespace clp_s {
void ArchiveReader::open(std::string const& archive_path) {
void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
if (m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}
m_is_open = true;
m_archive_path = archive_path;
m_archive_id = archive_id;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
std::filesystem::path archive_path{archives_dir};
archive_path /= m_archive_id;
auto const archive_path_str = archive_path.string();

m_var_dict = ReaderUtils::get_variable_dictionary_reader(m_archive_path);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(m_archive_path);
m_array_dict = ReaderUtils::get_array_dictionary_reader(m_archive_path);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(m_archive_path);
m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str);
m_array_dict = ReaderUtils::get_array_dictionary_reader(archive_path_str);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(archive_path_str);

m_schema_tree = ReaderUtils::read_schema_tree(m_archive_path);
m_schema_map = ReaderUtils::read_schemas(m_archive_path);
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(m_archive_path + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(m_archive_path + constants::cArchiveTableMetadataFile);
m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
}

void ArchiveReader::read_metadata() {
Expand Down
11 changes: 7 additions & 4 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <map>
#include <set>
#include <span>
#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>
Expand All @@ -29,9 +30,10 @@ class ArchiveReader {

/**
* Opens an archive for reading.
* @param archive_path
* @param archives_dir
* @param archive_id
*/
void open(std::string const& archive_path);
void open(std::string_view archives_dir, std::string_view archive_id);

/**
* Reads the dictionaries and metadata.
Expand Down Expand Up @@ -92,6 +94,8 @@ class ArchiveReader {
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

std::string_view get_archive_id() { return m_archive_id; }

std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }

std::shared_ptr<LogTypeDictionaryReader> get_log_type_dictionary() { return m_log_dict; }
Expand Down Expand Up @@ -161,8 +165,7 @@ class ArchiveReader {
);

bool m_is_open;
std::string m_archive_path;

std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
std::shared_ptr<LogTypeDictionaryReader> m_log_dict;
std::shared_ptr<LogTypeDictionaryReader> m_array_dict;
Expand Down
11 changes: 7 additions & 4 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir) {
m_archives_dir(option.archives_dir),
m_archive_id(option.archive_id) {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand All @@ -28,12 +29,14 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
);
}

if (false == std::filesystem::is_directory(m_archives_dir)) {
std::filesystem::path archive_path{m_archives_dir};
archive_path /= m_archive_id;
if (false == std::filesystem::is_directory(archive_path)) {
throw OperationFailed(
ErrorCodeFailure,
__FILENAME__,
__LINE__,
fmt::format("'{}' is not a directory", m_archives_dir)
fmt::format("'{}' is not a directory", archive_path.c_str())
);
}
}
Expand All @@ -43,7 +46,7 @@ void JsonConstructor::store() {
writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending);

m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir);
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->read_dictionaries_and_metadata();
m_archive_reader->store(writer);
m_archive_reader->close();
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace clp_s {
struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
};

Expand Down Expand Up @@ -50,6 +51,7 @@ class JsonConstructor {

private:
std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;

std::unique_ptr<ArchiveReader> m_archive_reader;
Expand Down
13 changes: 6 additions & 7 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ int main(int argc, char const* argv[]) {

clp_s::JsonConstructorOption option;
option.output_dir = command_line_arguments.get_output_dir();
option.archives_dir = archives_dir;
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
option.archives_dir = std::filesystem::path{archives_dir} / archive_id;
option.archive_id = archive_id;
decompress_archive(option);
} else {
for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) {
Expand All @@ -283,7 +284,7 @@ int main(int argc, char const* argv[]) {
continue;
}

option.archives_dir = entry.path();
option.archive_id = entry.path().filename();
decompress_archive(option);
}
}
Expand Down Expand Up @@ -330,10 +331,7 @@ int main(int argc, char const* argv[]) {
auto const& archive_id = command_line_arguments.get_archive_id();
auto archive_reader = std::make_shared<clp_s::ArchiveReader>();
if (false == archive_id.empty()) {
std::filesystem::path const archives_dir_path{archives_dir};
std::string const archive_path{archives_dir_path / archive_id};

archive_reader->open(archive_path);
archive_reader->open(archives_dir, archive_id);
if (false
== search_archive(command_line_arguments, archive_reader, expr, reducer_socket_fd))
{
Expand All @@ -347,7 +345,8 @@ int main(int argc, char const* argv[]) {
continue;
}

archive_reader->open(entry.path());
auto const archive_id = entry.path().filename().string();
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
archive_reader->open(archives_dir, archive_id);
if (false
== search_archive(
command_line_arguments,
Expand Down
7 changes: 4 additions & 3 deletions components/core/src/clp_s/search/Output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bool Output::filter() {
populate_string_queries(top_level_expr);

std::string message;
auto const archive_id = m_archive_reader->get_archive_id();
for (int32_t schema_id : matched_schemas) {
m_expr_clp_query.clear();
m_expr_var_match_map.clear();
Expand All @@ -85,15 +86,15 @@ bool Output::filter() {

auto& reader = m_archive_reader->read_table(
schema_id,
m_output_handler->should_output_timestamp(),
m_output_handler->should_output_metadata(),
m_should_marshal_records
);
reader.initialize_filter(this);

if (m_output_handler->should_output_timestamp()) {
if (m_output_handler->should_output_metadata()) {
epochtime_t timestamp;
while (reader.get_next_message_with_timestamp(message, timestamp, this)) {
m_output_handler->write(message, timestamp);
m_output_handler->write(message, timestamp, archive_id);
}
} else {
while (reader.get_next_message(message, this)) {
Expand Down
50 changes: 30 additions & 20 deletions components/core/src/clp_s/search/OutputHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "OutputHandler.hpp"

#include <sstream>
#include <string>
#include <string_view>

#include <spdlog/spdlog.h>

Expand All @@ -9,9 +11,12 @@
#include "../../reducer/network_utils.hpp"
#include "../../reducer/Record.hpp"

using std::string;
using std::string_view;

namespace clp_s::search {
NetworkOutputHandler::NetworkOutputHandler(
std::string const& host,
string const& host,
int port,
bool should_output_timestamp
)
Expand All @@ -23,18 +28,14 @@ NetworkOutputHandler::NetworkOutputHandler(
}
}

void NetworkOutputHandler::write(std::string const& message, epochtime_t timestamp) {
msgpack::type::tuple<epochtime_t, std::string, std::string> src(timestamp, message, "");
msgpack::sbuffer m;
msgpack::pack(m, src);

if (-1 == send(m_socket_fd, m.data(), m.size(), 0)) {
throw OperationFailed(ErrorCode::ErrorCodeFailureNetwork, __FILE__, __LINE__);
}
}

void NetworkOutputHandler::write(std::string const& message) {
msgpack::type::tuple<epochtime_t, std::string, std::string> src(0, message, "");
void NetworkOutputHandler::write(
string_view message,
epochtime_t timestamp,
string_view archive_id
) {
static constexpr string_view cOrigFilePathPlaceholder{""};
msgpack::type::tuple<epochtime_t, string, string, string> const
src(timestamp, message, cOrigFilePathPlaceholder, archive_id);
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
msgpack::sbuffer m;
msgpack::pack(m, src);

Expand All @@ -44,8 +45,8 @@ void NetworkOutputHandler::write(std::string const& message) {
}

ResultsCacheOutputHandler::ResultsCacheOutputHandler(
std::string const& uri,
std::string const& collection,
string const& uri,
string const& collection,
uint64_t batch_size,
uint64_t max_num_results,
bool should_output_timestamp
Expand Down Expand Up @@ -73,7 +74,8 @@ ErrorCode ResultsCacheOutputHandler::flush() {
m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)),
bsoncxx::builder::basic::kvp("message", std::move(result.message)),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp)
bsoncxx::builder::basic::kvp("timestamp", result.timestamp),
bsoncxx::builder::basic::kvp("archive_id", std::move(result.archive_id))
)));
count++;

Expand All @@ -98,12 +100,20 @@ ErrorCode ResultsCacheOutputHandler::flush() {
return ErrorCode::ErrorCodeSuccess;
}

void ResultsCacheOutputHandler::write(std::string const& message, epochtime_t timestamp) {
void ResultsCacheOutputHandler::write(
string_view message,
epochtime_t timestamp,
string_view archive_id
) {
if (m_latest_results.size() < m_max_num_results) {
m_latest_results.emplace(std::make_unique<QueryResult>("", message, timestamp));
m_latest_results.emplace(
std::make_unique<QueryResult>(string_view{}, message, timestamp, archive_id)
);
} else if (m_latest_results.top()->timestamp < timestamp) {
m_latest_results.pop();
m_latest_results.emplace(std::make_unique<QueryResult>("", message, timestamp));
m_latest_results.emplace(
std::make_unique<QueryResult>(string_view{}, message, timestamp, archive_id)
);
}
}

Expand All @@ -114,7 +124,7 @@ CountOutputHandler::CountOutputHandler(int reducer_socket_fd)
m_pipeline.add_pipeline_stage(std::make_shared<reducer::CountOperator>());
}

void CountOutputHandler::write(std::string const& message) {
void CountOutputHandler::write(string_view message) {
m_pipeline.push_record(reducer::EmptyRecord{});
}

Expand Down
Loading
Loading