Skip to content

Commit

Permalink
clp-s: Add archive_id to search results. (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 authored Jun 14, 2024
1 parent ccfb820 commit eedbd20
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 73 deletions.
28 changes: 18 additions & 10 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
#include "ArchiveReader.hpp"

#include <filesystem>
#include <string_view>

#include "archive_constants.hpp"
#include "ReaderUtils.hpp"

using std::string_view;

namespace clp_s {
void ArchiveReader::open(std::string const& archive_path) {
void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
if (m_is_open) {
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
}
m_is_open = true;
m_archive_path = archive_path;
m_archive_id = archive_id;
std::filesystem::path archive_path{archives_dir};
archive_path /= m_archive_id;
auto const archive_path_str = archive_path.string();

m_var_dict = ReaderUtils::get_variable_dictionary_reader(m_archive_path);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(m_archive_path);
m_array_dict = ReaderUtils::get_array_dictionary_reader(m_archive_path);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(m_archive_path);
m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str);
m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str);
m_array_dict = ReaderUtils::get_array_dictionary_reader(archive_path_str);
m_timestamp_dict = ReaderUtils::get_timestamp_dictionary_reader(archive_path_str);

m_schema_tree = ReaderUtils::read_schema_tree(m_archive_path);
m_schema_map = ReaderUtils::read_schemas(m_archive_path);
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_tables_file_reader.open(m_archive_path + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(m_archive_path + constants::cArchiveTableMetadataFile);
m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile);
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
}

void ArchiveReader::read_metadata() {
Expand Down
11 changes: 7 additions & 4 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <map>
#include <set>
#include <span>
#include <string_view>
#include <utility>

#include <boost/filesystem.hpp>
Expand All @@ -29,9 +30,10 @@ class ArchiveReader {

/**
* Opens an archive for reading.
* @param archive_path
* @param archives_dir
* @param archive_id
*/
void open(std::string const& archive_path);
void open(std::string_view archives_dir, std::string_view archive_id);

/**
* Reads the dictionaries and metadata.
Expand Down Expand Up @@ -92,6 +94,8 @@ class ArchiveReader {
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

std::string_view get_archive_id() { return m_archive_id; }

std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }

std::shared_ptr<LogTypeDictionaryReader> get_log_type_dictionary() { return m_log_dict; }
Expand Down Expand Up @@ -161,8 +165,7 @@ class ArchiveReader {
);

bool m_is_open;
std::string m_archive_path;

std::string m_archive_id;
std::shared_ptr<VariableDictionaryReader> m_var_dict;
std::shared_ptr<LogTypeDictionaryReader> m_log_dict;
std::shared_ptr<LogTypeDictionaryReader> m_array_dict;
Expand Down
11 changes: 7 additions & 4 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
namespace clp_s {
JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir) {
m_archives_dir(option.archives_dir),
m_archive_id(option.archive_id) {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand All @@ -28,12 +29,14 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
);
}

if (false == std::filesystem::is_directory(m_archives_dir)) {
std::filesystem::path archive_path{m_archives_dir};
archive_path /= m_archive_id;
if (false == std::filesystem::is_directory(archive_path)) {
throw OperationFailed(
ErrorCodeFailure,
__FILENAME__,
__LINE__,
fmt::format("'{}' is not a directory", m_archives_dir)
fmt::format("'{}' is not a directory", archive_path.c_str())
);
}
}
Expand All @@ -43,7 +46,7 @@ void JsonConstructor::store() {
writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending);

m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir);
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->read_dictionaries_and_metadata();
m_archive_reader->store(writer);
m_archive_reader->close();
Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace clp_s {
struct JsonConstructorOption {
std::string archives_dir;
std::string archive_id;
std::string output_dir;
};

Expand Down Expand Up @@ -50,6 +51,7 @@ class JsonConstructor {

private:
std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;

std::unique_ptr<ArchiveReader> m_archive_reader;
Expand Down
13 changes: 6 additions & 7 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,11 @@ int main(int argc, char const* argv[]) {

clp_s::JsonConstructorOption option;
option.output_dir = command_line_arguments.get_output_dir();
option.archives_dir = archives_dir;
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
option.archives_dir = std::filesystem::path{archives_dir} / archive_id;
option.archive_id = archive_id;
decompress_archive(option);
} else {
for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) {
Expand All @@ -283,7 +284,7 @@ int main(int argc, char const* argv[]) {
continue;
}

option.archives_dir = entry.path();
option.archive_id = entry.path().filename();
decompress_archive(option);
}
}
Expand Down Expand Up @@ -330,10 +331,7 @@ int main(int argc, char const* argv[]) {
auto const& archive_id = command_line_arguments.get_archive_id();
auto archive_reader = std::make_shared<clp_s::ArchiveReader>();
if (false == archive_id.empty()) {
std::filesystem::path const archives_dir_path{archives_dir};
std::string const archive_path{archives_dir_path / archive_id};

archive_reader->open(archive_path);
archive_reader->open(archives_dir, archive_id);
if (false
== search_archive(command_line_arguments, archive_reader, expr, reducer_socket_fd))
{
Expand All @@ -347,7 +345,8 @@ int main(int argc, char const* argv[]) {
continue;
}

archive_reader->open(entry.path());
auto const archive_id = entry.path().filename().string();
archive_reader->open(archives_dir, archive_id);
if (false
== search_archive(
command_line_arguments,
Expand Down
7 changes: 4 additions & 3 deletions components/core/src/clp_s/search/Output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bool Output::filter() {
populate_string_queries(top_level_expr);

std::string message;
auto const archive_id = m_archive_reader->get_archive_id();
for (int32_t schema_id : matched_schemas) {
m_expr_clp_query.clear();
m_expr_var_match_map.clear();
Expand All @@ -85,15 +86,15 @@ bool Output::filter() {

auto& reader = m_archive_reader->read_table(
schema_id,
m_output_handler->should_output_timestamp(),
m_output_handler->should_output_metadata(),
m_should_marshal_records
);
reader.initialize_filter(this);

if (m_output_handler->should_output_timestamp()) {
if (m_output_handler->should_output_metadata()) {
epochtime_t timestamp;
while (reader.get_next_message_with_timestamp(message, timestamp, this)) {
m_output_handler->write(message, timestamp);
m_output_handler->write(message, timestamp, archive_id);
}
} else {
while (reader.get_next_message(message, this)) {
Expand Down
50 changes: 30 additions & 20 deletions components/core/src/clp_s/search/OutputHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "OutputHandler.hpp"

#include <sstream>
#include <string>
#include <string_view>

#include <spdlog/spdlog.h>

Expand All @@ -9,9 +11,12 @@
#include "../../reducer/network_utils.hpp"
#include "../../reducer/Record.hpp"

using std::string;
using std::string_view;

namespace clp_s::search {
NetworkOutputHandler::NetworkOutputHandler(
std::string const& host,
string const& host,
int port,
bool should_output_timestamp
)
Expand All @@ -23,18 +28,14 @@ NetworkOutputHandler::NetworkOutputHandler(
}
}

void NetworkOutputHandler::write(std::string const& message, epochtime_t timestamp) {
msgpack::type::tuple<epochtime_t, std::string, std::string> src(timestamp, message, "");
msgpack::sbuffer m;
msgpack::pack(m, src);

if (-1 == send(m_socket_fd, m.data(), m.size(), 0)) {
throw OperationFailed(ErrorCode::ErrorCodeFailureNetwork, __FILE__, __LINE__);
}
}

void NetworkOutputHandler::write(std::string const& message) {
msgpack::type::tuple<epochtime_t, std::string, std::string> src(0, message, "");
void NetworkOutputHandler::write(
string_view message,
epochtime_t timestamp,
string_view archive_id
) {
static constexpr string_view cOrigFilePathPlaceholder{""};
msgpack::type::tuple<epochtime_t, string, string, string> const
src(timestamp, message, cOrigFilePathPlaceholder, archive_id);
msgpack::sbuffer m;
msgpack::pack(m, src);

Expand All @@ -44,8 +45,8 @@ void NetworkOutputHandler::write(std::string const& message) {
}

ResultsCacheOutputHandler::ResultsCacheOutputHandler(
std::string const& uri,
std::string const& collection,
string const& uri,
string const& collection,
uint64_t batch_size,
uint64_t max_num_results,
bool should_output_timestamp
Expand Down Expand Up @@ -73,7 +74,8 @@ ErrorCode ResultsCacheOutputHandler::flush() {
m_results.emplace_back(std::move(bsoncxx::builder::basic::make_document(
bsoncxx::builder::basic::kvp("original_path", std::move(result.original_path)),
bsoncxx::builder::basic::kvp("message", std::move(result.message)),
bsoncxx::builder::basic::kvp("timestamp", result.timestamp)
bsoncxx::builder::basic::kvp("timestamp", result.timestamp),
bsoncxx::builder::basic::kvp("archive_id", std::move(result.archive_id))
)));
count++;

Expand All @@ -98,12 +100,20 @@ ErrorCode ResultsCacheOutputHandler::flush() {
return ErrorCode::ErrorCodeSuccess;
}

void ResultsCacheOutputHandler::write(std::string const& message, epochtime_t timestamp) {
void ResultsCacheOutputHandler::write(
string_view message,
epochtime_t timestamp,
string_view archive_id
) {
if (m_latest_results.size() < m_max_num_results) {
m_latest_results.emplace(std::make_unique<QueryResult>("", message, timestamp));
m_latest_results.emplace(
std::make_unique<QueryResult>(string_view{}, message, timestamp, archive_id)
);
} else if (m_latest_results.top()->timestamp < timestamp) {
m_latest_results.pop();
m_latest_results.emplace(std::make_unique<QueryResult>("", message, timestamp));
m_latest_results.emplace(
std::make_unique<QueryResult>(string_view{}, message, timestamp, archive_id)
);
}
}

Expand All @@ -114,7 +124,7 @@ CountOutputHandler::CountOutputHandler(int reducer_socket_fd)
m_pipeline.add_pipeline_stage(std::make_shared<reducer::CountOperator>());
}

void CountOutputHandler::write(std::string const& message) {
void CountOutputHandler::write(string_view message) {
m_pipeline.push_record(reducer::EmptyRecord{});
}

Expand Down
Loading

0 comments on commit eedbd20

Please sign in to comment.