diff --git a/.gitmodules b/.gitmodules index d48454341..213771efd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -11,3 +11,6 @@ [submodule "components/core/submodules/yaml-cpp"] path = components/core/submodules/yaml-cpp url = https://github.com/jbeder/yaml-cpp.git +[submodule "components/core/submodules/boost-outcome"] + path = components/core/submodules/boost-outcome + url = git@github.com:boostorg/outcome.git diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 14c0956fd..7828f819f 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -227,6 +227,15 @@ set(SOURCE_FILES_clp src/EncodedVariableInterpreter.cpp src/EncodedVariableInterpreter.hpp src/ErrorCode.hpp + src/ffi/encoding_methods.cpp + src/ffi/encoding_methods.hpp + src/ffi/encoding_methods.tpp + src/ffi/ir_stream/byteswap.hpp + src/ffi/ir_stream/decoding_methods.cpp + src/ffi/ir_stream/decoding_methods.hpp + src/ffi/ir_stream/decoding_methods.tpp + src/ffi/ir_stream/encoding_methods.cpp + src/ffi/ir_stream/encoding_methods.hpp src/FileReader.cpp src/FileReader.hpp src/FileWriter.cpp @@ -239,6 +248,11 @@ set(SOURCE_FILES_clp src/GlobalMySQLMetadataDB.hpp src/GlobalSQLiteMetadataDB.cpp src/GlobalSQLiteMetadataDB.hpp + src/ir/LogEvent.hpp + src/ir/LogEventDeserializer.cpp + src/ir/LogEventDeserializer.hpp + src/ir/utils.cpp + src/ir/utils.hpp src/LibarchiveFileReader.cpp src/LibarchiveFileReader.hpp src/LibarchiveReader.cpp @@ -397,6 +411,12 @@ set(SOURCE_FILES_clg src/EncodedVariableInterpreter.cpp src/EncodedVariableInterpreter.hpp src/ErrorCode.hpp + src/ffi/encoding_methods.cpp + src/ffi/encoding_methods.hpp + src/ffi/encoding_methods.tpp + src/ffi/ir_stream/decoding_methods.cpp + src/ffi/ir_stream/decoding_methods.hpp + src/ffi/ir_stream/decoding_methods.tpp src/FileReader.cpp src/FileReader.hpp src/FileWriter.cpp @@ -411,6 +431,7 @@ set(SOURCE_FILES_clg src/GlobalSQLiteMetadataDB.hpp src/Grep.cpp src/Grep.hpp + src/ir/LogEvent.hpp src/LogTypeDictionaryEntry.cpp src/LogTypeDictionaryEntry.hpp src/LogTypeDictionaryReader.cpp @@ -554,12 +575,19 @@ set(SOURCE_FILES_clo src/EncodedVariableInterpreter.cpp src/EncodedVariableInterpreter.hpp src/ErrorCode.hpp + src/ffi/encoding_methods.cpp + src/ffi/encoding_methods.hpp + src/ffi/encoding_methods.tpp + src/ffi/ir_stream/decoding_methods.cpp + src/ffi/ir_stream/decoding_methods.hpp + src/ffi/ir_stream/decoding_methods.tpp src/FileReader.cpp src/FileReader.hpp src/FileWriter.cpp src/FileWriter.hpp src/Grep.cpp src/Grep.hpp + src/ir/LogEvent.hpp src/LogTypeDictionaryEntry.cpp src/LogTypeDictionaryEntry.hpp src/LogTypeDictionaryReader.cpp @@ -726,6 +754,7 @@ set(SOURCE_FILES_unitTest src/ffi/ir_stream/byteswap.hpp src/ffi/ir_stream/decoding_methods.cpp src/ffi/ir_stream/decoding_methods.hpp + src/ffi/ir_stream/decoding_methods.tpp src/ffi/ir_stream/encoding_methods.cpp src/ffi/ir_stream/encoding_methods.hpp src/ffi/ir_stream/protocol_constants.hpp @@ -757,6 +786,11 @@ set(SOURCE_FILES_unitTest src/GlobalSQLiteMetadataDB.hpp src/Grep.cpp src/Grep.hpp + src/ir/LogEvent.hpp + src/ir/LogEventDeserializer.cpp + src/ir/LogEventDeserializer.hpp + src/ir/utils.cpp + src/ir/utils.hpp src/LibarchiveFileReader.cpp src/LibarchiveFileReader.hpp src/LibarchiveReader.cpp diff --git a/components/core/src/EncodedVariableInterpreter.cpp b/components/core/src/EncodedVariableInterpreter.cpp index 3545fce30..246897ea7 100644 --- a/components/core/src/EncodedVariableInterpreter.cpp +++ b/components/core/src/EncodedVariableInterpreter.cpp @@ -7,6 +7,7 @@ // Project headers #include "Defs.h" #include "ffi/encoding_methods.hpp" +#include "ffi/ir_stream/decoding_methods.hpp" #include "spdlog_with_specializations.hpp" #include "string_utils.hpp" #include "type_utils.hpp" @@ -207,24 +208,79 @@ void EncodedVariableInterpreter::encode_and_add_to_dictionary (const string& mes // To avoid reallocating the logtype as we append to it, reserve enough space to hold the entire message logtype_dict_entry.reserve_constant_length(message.length()); while (logtype_dict_entry.parse_next_var(message, var_begin_pos, var_end_pos, var_str)) { - // Encode variable - encoded_variable_t encoded_var; - if (convert_string_to_representable_integer_var(var_str, encoded_var)) { - logtype_dict_entry.add_int_var(); - } else if (convert_string_to_representable_float_var(var_str, encoded_var)) { - logtype_dict_entry.add_float_var(); - } else { - // Variable string looks like a dictionary variable, so encode it as so - variable_dictionary_id_t id; - var_dict.add_entry(var_str, id); - encoded_var = encode_var_dict_id(id); - var_ids.push_back(id); + auto encoded_var = encode_var(var_str, logtype_dict_entry, var_dict, var_ids); + encoded_vars.push_back(encoded_var); + } +} - logtype_dict_entry.add_dictionary_var(); - } +template +void EncodedVariableInterpreter::encode_and_add_to_dictionary( + ir::LogEvent const& log_event, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& encoded_vars, + std::vector& var_ids, + size_t& raw_num_bytes +) { + logtype_dict_entry.clear(); + logtype_dict_entry.reserve_constant_length(log_event.get_logtype().length()); + raw_num_bytes = 0; + + auto constant_handler = [&](std::string const& value, size_t begin_pos, size_t length) { + raw_num_bytes += length; + logtype_dict_entry.add_constant(value, begin_pos, length); + }; + + auto encoded_int_handler = [&](encoded_variable_t encoded_var) { + raw_num_bytes += ffi::decode_integer_var(encoded_var).length(); + logtype_dict_entry.add_int_var(); + + ffi::eight_byte_encoded_variable_t eight_byte_encoded_var{}; + if constexpr (std::is_same_v) { + eight_byte_encoded_var = encoded_var; + } else { // std::is_same_v + eight_byte_encoded_var = ffi::encode_four_byte_integer_as_eight_byte(encoded_var); + } + encoded_vars.push_back(eight_byte_encoded_var); + }; + + auto encoded_float_handler = [&](ffi::four_byte_encoded_variable_t encoded_var) { + raw_num_bytes += ffi::decode_float_var(encoded_var).length(); + logtype_dict_entry.add_float_var(); + + ffi::eight_byte_encoded_variable_t eight_byte_encoded_var{}; + if constexpr (std::is_same_v) { + eight_byte_encoded_var = encoded_var; + } else { // std::is_same_v + eight_byte_encoded_var = ffi::encode_four_byte_float_as_eight_byte(encoded_var); + } + encoded_vars.push_back(eight_byte_encoded_var); + }; + + auto dict_var_handler = [&](string const& dict_var) { + raw_num_bytes += dict_var.length(); + + ffi::eight_byte_encoded_variable_t encoded_var{}; + if constexpr (std::is_same_v) { + encoded_var = encode_var_dict_id( + add_dict_var(dict_var, logtype_dict_entry, var_dict, var_ids) + ); + } else { // std::is_same_v + encoded_var = encode_var(dict_var, logtype_dict_entry, var_dict, var_ids); + } encoded_vars.push_back(encoded_var); - } + }; + + ffi::ir_stream::generic_decode_message( + log_event.get_logtype(), + log_event.get_encoded_vars(), + log_event.get_dict_vars(), + constant_handler, + encoded_int_handler, + encoded_float_handler, + dict_var_handler + ); } bool EncodedVariableInterpreter::decode_variables_into_message (const LogTypeDictionaryEntry& logtype_dict_entry, const VariableDictionaryReader& var_dict, @@ -337,3 +393,58 @@ bool EncodedVariableInterpreter::wildcard_search_dictionary_and_get_encoded_matc encoded_variable_t EncodedVariableInterpreter::encode_var_dict_id (variable_dictionary_id_t id) { return bit_cast(id); } + +encoded_variable_t EncodedVariableInterpreter::encode_var( + string const& var, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + vector& var_ids +) { + encoded_variable_t encoded_var{0}; + if (convert_string_to_representable_integer_var(var, encoded_var)) { + logtype_dict_entry.add_int_var(); + } else if (convert_string_to_representable_float_var(var, encoded_var)) { + logtype_dict_entry.add_float_var(); + } else { + // Variable string looks like a dictionary variable, so encode it as so + encoded_var = encode_var_dict_id(add_dict_var(var, logtype_dict_entry, var_dict, var_ids)); + } + return encoded_var; +} + +variable_dictionary_id_t EncodedVariableInterpreter::add_dict_var( + string const& var, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + vector& var_ids +) { + variable_dictionary_id_t id{cVariableDictionaryIdMax}; + var_dict.add_entry(var, id); + var_ids.push_back(id); + + logtype_dict_entry.add_dictionary_var(); + + return id; +} + +// Explicitly declare template specializations so that we can define the +// template methods in this file +template +void EncodedVariableInterpreter::encode_and_add_to_dictionary( + ir::LogEvent const& log_event, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& encoded_vars, + std::vector& var_ids, + size_t& raw_num_bytes +); + +template +void EncodedVariableInterpreter::encode_and_add_to_dictionary( + ir::LogEvent const& log_event, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& encoded_vars, + std::vector& var_ids, + size_t& raw_num_bytes +); diff --git a/components/core/src/EncodedVariableInterpreter.hpp b/components/core/src/EncodedVariableInterpreter.hpp index f0a71d3bc..d33cb7ec4 100644 --- a/components/core/src/EncodedVariableInterpreter.hpp +++ b/components/core/src/EncodedVariableInterpreter.hpp @@ -6,6 +6,7 @@ #include // Project headers +#include "ir/LogEvent.hpp" #include "Query.hpp" #include "TraceableException.hpp" #include "VariableDictionaryReader.hpp" @@ -73,6 +74,32 @@ class EncodedVariableInterpreter { */ static void encode_and_add_to_dictionary (const std::string& message, LogTypeDictionaryEntry& logtype_dict_entry, VariableDictionaryWriter& var_dict, std::vector& encoded_vars, std::vector& var_ids); + + /** + * Encodes the given IR log event, constructing a logtype dictionary entry, + * and adding any dictionary variables to the dictionary. NOTE: Four-byte + * encoded variables will be converted to eight-byte encoded variables. + * @tparam encoded_variable_t The type of the encoded variables in the log + * event + * @param log_event + * @param logtype_dict_entry + * @param var_dict + * @param encoded_vars A container to store the encoded variables in + * @param var_ids A container to store the dictionary IDs for dictionary + * variables + * @param raw_num_bytes Returns an estimate of the number of bytes that + * this log event would occupy if it was not encoded in CLP's IR + */ + template + static void encode_and_add_to_dictionary( + ir::LogEvent const& log_event, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& encoded_vars, + std::vector& var_ids, + size_t& raw_num_bytes + ); + /** * Decodes all variables and decompresses them into a message * @param logtype_dict_entry @@ -106,6 +133,40 @@ class EncodedVariableInterpreter { */ static bool wildcard_search_dictionary_and_get_encoded_matches (const std::string& var_wildcard_str, const VariableDictionaryReader& var_dict, bool ignore_case, SubQuery& sub_query); + +private: + /** + * Encodes the given string as a dictionary or non-dictionary variable and + * adds a corresponding placeholder to the logtype + * @param var + * @param logtype_dict_entry + * @param var_dict + * @param var_ids A container to add the dictionary ID to (if the string is + * a dictionary variable) + * @return The encoded variable + */ + static encoded_variable_t encode_var( + std::string const& var, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& var_ids + ); + + /** + * Adds the given string to the variable dictionary and adds a corresponding + * placeholder to logtype + * @param var + * @param logtype_dict_entry + * @param var_dict + * @param var_ids A container to add the dictionary ID to + * @return The dictionary ID + */ + static variable_dictionary_id_t add_dict_var( + std::string const& var, + LogTypeDictionaryEntry& logtype_dict_entry, + VariableDictionaryWriter& var_dict, + std::vector& var_ids + ); }; #endif // ENCODEDVARIABLEINTERPRETER_HPP diff --git a/components/core/src/clp/FileCompressor.cpp b/components/core/src/clp/FileCompressor.cpp index b64e1f951..88db676ee 100644 --- a/components/core/src/clp/FileCompressor.cpp +++ b/components/core/src/clp/FileCompressor.cpp @@ -6,15 +6,20 @@ #include // Boost libraries +#include #include // libarchive #include // Project headers +#include "../ffi/ir_stream/decoding_methods.hpp" +#include "../ir/utils.hpp" #include "../Profiler.hpp" #include "utils.hpp" +using ir::has_ir_stream_magic_number; +using ir::LogEventDeserializer; using std::cout; using std::endl; using std::set; @@ -279,8 +284,9 @@ namespace clp { utf8_validation_buf, utf8_validation_buf_len ); + string file_path{m_libarchive_reader.get_path()}; if (is_utf8_sequence(utf8_validation_buf_len, utf8_validation_buf)) { - auto boost_path_for_compression = parent_boost_path / m_libarchive_reader.get_path(); + auto boost_path_for_compression = parent_boost_path / file_path; if (use_heuristic) { parse_and_encode_with_heuristic(target_data_size_of_dicts, archive_user_config, target_encoded_file_size, boost_path_for_compression.string(), file_to_compress.get_group_id(), archive_writer, @@ -289,8 +295,27 @@ namespace clp { parse_and_encode(target_data_size_of_dicts, archive_user_config, target_encoded_file_size, boost_path_for_compression.string(), file_to_compress.get_group_id(), archive_writer, m_libarchive_file_reader); } + } else if (has_ir_stream_magic_number({utf8_validation_buf, utf8_validation_buf_len})) { + // Remove .clp suffix if found + static constexpr char cIrStreamExtension[] = ".clp"; + if (boost::iends_with(file_path, cIrStreamExtension)) { + file_path.resize(file_path.length() - strlen(cIrStreamExtension)); + } + auto boost_path_for_compression = parent_boost_path / file_path; + + if (false == compress_ir_stream( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + boost_path_for_compression.string(), + file_to_compress.get_group_id(), + archive_writer, + m_libarchive_file_reader + )) { + succeeded = false; + } } else { - SPDLOG_ERROR("Cannot compress {} - not UTF-8 encoded.", m_libarchive_reader.get_path()); + SPDLOG_ERROR("Cannot compress {} - not an IR stream or UTF-8 encoded", file_path); succeeded = false; } @@ -302,4 +327,160 @@ namespace clp { return succeeded; } + + bool FileCompressor::compress_ir_stream( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive_writer, + ReaderInterface& reader + ) { + bool uses_four_byte_encoding{false}; + auto ir_error_code = ffi::ir_stream::get_encoding_type(reader, uses_four_byte_encoding); + if (ffi::ir_stream::IRErrorCode_Success != ir_error_code) { + SPDLOG_ERROR("Cannot compress {}, IR error={}", path, static_cast(ir_error_code)); + return false; + } + + try { + std::error_code error_code{}; + if (uses_four_byte_encoding) { + auto result + = LogEventDeserializer::create(reader); + if (result.has_error()) { + error_code = result.error(); + } else { + error_code = compress_ir_stream_by_encoding( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + path, + group_id, + archive_writer, + result.value() + ); + } + } else { + auto result + = LogEventDeserializer::create(reader); + if (result.has_error()) { + error_code = result.error(); + } else { + error_code = compress_ir_stream_by_encoding( + target_data_size_of_dicts, + archive_user_config, + target_encoded_file_size, + path, + group_id, + archive_writer, + result.value() + ); + } + } + if (0 != error_code.value()) { + SPDLOG_ERROR( + "Failed to compress {} - {}:{}", + error_code.category().name(), + error_code.message() + ); + return false; + } + } catch (TraceableException& e) { + auto error_code = e.get_error_code(); + if (ErrorCode_errno == error_code) { + SPDLOG_ERROR( + "Failed to compress {} - {}:{} {}, errno={}", + path, + e.get_filename(), + e.get_line_number(), + e.what(), + errno + ); + } else { + SPDLOG_ERROR( + "Failed to compress {} - {}:{} {}, error_code={}", + path, + e.get_filename(), + e.get_line_number(), + e.what(), + error_code + ); + } + return false; + } + + return true; + } + + template + std::error_code FileCompressor::compress_ir_stream_by_encoding( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive, + LogEventDeserializer& log_event_deserializer + ) { + archive.create_and_open_file(path, group_id, m_uuid_generator(), 0); + + // We assume an IR stream only has one timestamp pattern + auto timestamp_pattern = log_event_deserializer.get_timestamp_pattern(); + archive.change_ts_pattern(×tamp_pattern); + + std::error_code error_code{}; + while (true) { + auto result = log_event_deserializer.deserialize_log_event(); + if (result.has_error()) { + auto error = result.error(); + if (std::errc::no_message_available != error) { + error_code = error; + } + break; + } + + // Split archive/encoded file if necessary before writing the new event + if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) { + split_file_and_archive( + archive_user_config, + path, + group_id, + ×tamp_pattern, + archive + ); + } else if (archive.get_file().get_encoded_size_in_bytes() >= target_encoded_file_size) { + split_file(path, group_id, ×tamp_pattern, archive); + } + + archive.write_log_event_ir(result.value()); + } + + close_file_and_append_to_segment(archive); + return error_code; + } + + // Explicitly declare template specializations so that we can define the + // template methods in this file + template std::error_code + FileCompressor::compress_ir_stream_by_encoding( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive, + LogEventDeserializer& log_event_deserializer + ); + template std::error_code + FileCompressor::compress_ir_stream_by_encoding( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive, + LogEventDeserializer& log_event_deserializer + ); } diff --git a/components/core/src/clp/FileCompressor.hpp b/components/core/src/clp/FileCompressor.hpp index 2db87c9d2..7d87e12db 100644 --- a/components/core/src/clp/FileCompressor.hpp +++ b/components/core/src/clp/FileCompressor.hpp @@ -1,22 +1,24 @@ #ifndef CLP_FILECOMPRESSOR_HPP #define CLP_FILECOMPRESSOR_HPP +// C++ standard libraries +#include + // Boost libraries #include // Project headers #include "../BufferedFileReader.hpp" +#include "../compressor_frontend/LogParser.hpp" +#include "../ir/LogEventDeserializer.hpp" #include "../LibarchiveFileReader.hpp" #include "../LibarchiveReader.hpp" #include "../MessageParser.hpp" #include "../ParsedMessage.hpp" #include "../streaming_archive/writer/Archive.hpp" #include "FileToCompress.hpp" -#include "../compressor_frontend/LogParser.hpp" namespace clp { - constexpr size_t cUtf8ValidationBufCapacity = 4096; - /** * Class to parse and compress a file into a streaming archive */ @@ -75,6 +77,51 @@ namespace clp { size_t target_encoded_file_size, const FileToCompress& file_to_compress, streaming_archive::writer::Archive& archive_writer, bool use_heuristic); + /** + * Compresses the IR stream from the given reader into the archive + * @param target_data_size_of_dicts + * @param archive_user_config + * @param target_encoded_file_size + * @param path + * @param group_id + * @param archive_writer + * @param reader + * @return Whether the IR stream was compressed successfully + */ + bool compress_ir_stream( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + std::string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive_writer, + ReaderInterface& reader + ); + + /** + * Compresses an IR stream using the eight-byte or four-byte encoding + * based on the given template parameter. + * @tparam encoded_variable_t + * @param target_data_size_of_dicts + * @param archive_user_config + * @param target_encoded_file_size + * @param path + * @param group_id + * @param archive + * @param log_event_deserializer + * @return An error code + */ + template + std::error_code compress_ir_stream_by_encoding( + size_t target_data_size_of_dicts, + streaming_archive::writer::Archive::UserConfig& archive_user_config, + size_t target_encoded_file_size, + std::string const& path, + group_id_t group_id, + streaming_archive::writer::Archive& archive, + ir::LogEventDeserializer& log_event_deserializer + ); + // Variables boost::uuids::random_generator& m_uuid_generator; BufferedFileReader m_file_reader; diff --git a/components/core/src/ffi/encoding_methods.cpp b/components/core/src/ffi/encoding_methods.cpp index 9b2b3441f..9a7b17d36 100644 --- a/components/core/src/ffi/encoding_methods.cpp +++ b/components/core/src/ffi/encoding_methods.cpp @@ -112,4 +112,28 @@ bool get_bounds_of_next_var( return (msg_length != begin_pos); } + +eight_byte_encoded_variable_t encode_four_byte_float_as_eight_byte( + four_byte_encoded_variable_t four_byte_encoded_var +) { + uint8_t decimal_point_pos{}; + uint8_t num_digits{}; + uint32_t digits{}; + bool is_negative{}; + decode_float_properties( + four_byte_encoded_var, is_negative, digits, num_digits, decimal_point_pos); + + return encode_float_properties( + is_negative, + digits, + num_digits, + decimal_point_pos + ); +} + +eight_byte_encoded_variable_t encode_four_byte_integer_as_eight_byte( + four_byte_encoded_variable_t four_byte_encoded_var +) { + return static_cast(four_byte_encoded_var); +} } // namespace ffi diff --git a/components/core/src/ffi/encoding_methods.hpp b/components/core/src/ffi/encoding_methods.hpp index ee24ca73d..52d05554f 100644 --- a/components/core/src/ffi/encoding_methods.hpp +++ b/components/core/src/ffi/encoding_methods.hpp @@ -132,6 +132,15 @@ bool get_bounds_of_next_var( template bool encode_float_string(std::string_view str, encoded_variable_t& encoded_var); +/** + * Encodes the given four-byte encoded float using the eight-byte encoding + * @param four_byte_encoded_var + * @return The float using the eight-byte encoding + */ +eight_byte_encoded_variable_t encode_four_byte_float_as_eight_byte( + four_byte_encoded_variable_t four_byte_encoded_var +); + /** * Encodes a float value with the given properties into an encoded variable * @tparam encoded_variable_t Type of the encoded variable @@ -154,6 +163,29 @@ encoded_variable_t encode_float_properties( size_t decimal_point_pos ); +/** + * Decodes an encoded float variable into its properties + * @tparam encoded_variable_t Type of the encoded variable + * @param encoded_var + * @param is_negative Returns whether the float is negative + * @param digits Returns the digits of the float, ignoring the decimal, as an + * integer + * @param num_digits Returns the number of digits in \p digits + * @param decimal_point_pos Returns the position of the decimal point from the + * right of the value + */ +template +void decode_float_properties( + encoded_variable_t encoded_var, + bool& is_negative, + std::conditional_t< + std::is_same_v, + uint32_t, + uint64_t>& digits, + uint8_t& num_digits, + uint8_t& decimal_point_pos +); + /** * Decodes the given encoded float variable into a string * @tparam encoded_variable_t Type of the encoded variable @@ -172,6 +204,16 @@ std::string decode_float_var(encoded_variable_t encoded_var); */ template bool encode_integer_string(std::string_view str, encoded_variable_t& encoded_var); + +/** + * Encodes the given four-byte encoded integer using the eight-byte encoding + * @param four_byte_encoded_var + * @return The integer using the eight-byte encoding + */ +eight_byte_encoded_variable_t encode_four_byte_integer_as_eight_byte( + four_byte_encoded_variable_t four_byte_encoded_var +); + /** * Decodes the given encoded integer variable into a string * @tparam encoded_variable_t Type of the encoded variable diff --git a/components/core/src/ffi/encoding_methods.tpp b/components/core/src/ffi/encoding_methods.tpp index c0053adc1..eacd93de4 100644 --- a/components/core/src/ffi/encoding_methods.tpp +++ b/components/core/src/ffi/encoding_methods.tpp @@ -165,17 +165,16 @@ encoded_variable_t encode_float_properties( } template -std::string decode_float_var(encoded_variable_t encoded_var) { - std::string value; - - uint8_t decimal_point_pos; - uint8_t num_digits; - std::conditional_t< - std::is_same_v, - uint32_t, - uint64_t> - digits; - bool is_negative; +void decode_float_properties( + encoded_variable_t encoded_var, + bool& is_negative, + std::conditional_t< + std::is_same_v, + uint32_t, + uint64_t>& digits, + uint8_t& num_digits, + uint8_t& decimal_point_pos +) { static_assert( (std::is_same_v || std::is_same_v) @@ -216,6 +215,21 @@ std::string decode_float_var(encoded_variable_t encoded_var) { encoded_float >>= 25; is_negative = encoded_float > 0; } +} + +template +std::string decode_float_var(encoded_variable_t encoded_var) { + std::string value; + + uint8_t decimal_point_pos; + uint8_t num_digits; + std::conditional_t< + std::is_same_v, + uint32_t, + uint64_t> + digits; + bool is_negative; + decode_float_properties(encoded_var, is_negative, digits, num_digits, decimal_point_pos); if (num_digits < decimal_point_pos) { throw EncodingException( diff --git a/components/core/src/ffi/ir_stream/decoding_methods.cpp b/components/core/src/ffi/ir_stream/decoding_methods.cpp index 7e86df0b3..167fc77cd 100644 --- a/components/core/src/ffi/ir_stream/decoding_methods.cpp +++ b/components/core/src/ffi/ir_stream/decoding_methods.cpp @@ -69,7 +69,7 @@ parse_dictionary_var(ReaderInterface& reader, encoded_tag_t encoded_tag, string& * decode */ template -IRErrorCode +static IRErrorCode parse_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_time_ms_t& ts); /** @@ -81,11 +81,9 @@ parse_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_time_m * encoded_variable_t == four_byte_encoded_variable_t or the actual timestamp if * encoded_variable_t == eight_byte_encoded_variable_t * @return IRErrorCode_Success on success - * @return IRErrorCode_Corrupted_IR if reader contains invalid IR * @return IRErrorCode_Decode_Error if the encoded message cannot be properly * decoded - * @return IRErrorCode_Incomplete_IR if reader doesn't contain enough data to - * decode + * @return Same as ffi::ir_stream::deserialize_ir_message */ template static IRErrorCode @@ -105,25 +103,6 @@ generic_decode_next_message(ReaderInterface& reader, string& message, epoch_time static IRErrorCode read_metadata_info(ReaderInterface& reader, encoded_tag_t& metadata_type, uint16_t& metadata_size); -/** - * Decodes the message from the given logtype, encoded variables, and dictionary - * variables. This function properly handles escaped variable placeholders in - * the logtype, as opposed to ffi::decode_message that doesn't handle escaped - * placeholders for simplicity - * @tparam encoded_variable_t Type of the encoded variable - * @param logtype - * @param encoded_vars - * @param dictionary_vars - * @return The decoded message - * @throw EncodingException if the message can't be decoded - */ -template -static string decode_message( - string const& logtype, - vector const& encoded_vars, - vector const& dictionary_vars -); - template static bool is_variable_tag(encoded_tag_t tag, bool& is_encoded_var) { static_assert( @@ -239,7 +218,7 @@ parse_dictionary_var(ReaderInterface& reader, encoded_tag_t encoded_tag, string& } template -IRErrorCode +static IRErrorCode parse_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_time_ms_t& ts) { static_assert( (is_same_v @@ -282,61 +261,43 @@ parse_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_time_m template static IRErrorCode generic_decode_next_message(ReaderInterface& reader, string& message, epoch_time_ms_t& timestamp) { - encoded_tag_t encoded_tag; - if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { - return IRErrorCode_Incomplete_IR; - } - if (cProtocol::Eof == encoded_tag) { - return IRErrorCode_Eof; - } + message.clear(); - // Handle variables vector encoded_vars; vector dict_vars; - encoded_variable_t encoded_variable; - string var_str; - bool is_encoded_var; - while (is_variable_tag(encoded_tag, is_encoded_var)) { - if (is_encoded_var) { - if (false == decode_int(reader, encoded_variable)) { - return IRErrorCode_Incomplete_IR; - } - encoded_vars.push_back(encoded_variable); - } else { - if (auto error_code = parse_dictionary_var(reader, encoded_tag, var_str); - IRErrorCode_Success != error_code) - { - return error_code; - } - dict_vars.emplace_back(var_str); - } - if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { - return IRErrorCode_Incomplete_IR; - } - } - - // Handle logtype string logtype; - if (auto error_code = parse_logtype(reader, encoded_tag, logtype); + if (auto error_code + = deserialize_ir_message(reader, logtype, encoded_vars, dict_vars, timestamp); IRErrorCode_Success != error_code) { return error_code; } - // NOTE: for the eight-byte encoding, the timestamp is the actual timestamp; - // for the four-byte encoding, the timestamp is a timestamp delta - if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { - return IRErrorCode_Incomplete_IR; - } - if (auto error_code = parse_timestamp(reader, encoded_tag, timestamp); - IRErrorCode_Success != error_code) - { - return error_code; - } + auto constant_handler = [&](string const& value, size_t begin_pos, size_t length) { + message.append(value, begin_pos, length); + }; + + auto encoded_int_handler = [&](encoded_variable_t value) { + message.append(decode_integer_var(value)); + }; + + auto encoded_float_handler = [&](encoded_variable_t encoded_float) { + message.append(decode_float_var(encoded_float)); + }; + + auto dict_var_handler = [&](string const& dict_var) { message.append(dict_var); }; try { - message = decode_message(logtype, encoded_vars, dict_vars); - } catch (EncodingException const& e) { + generic_decode_message( + logtype, + encoded_vars, + dict_vars, + constant_handler, + encoded_int_handler, + encoded_float_handler, + dict_var_handler + ); + } catch (DecodingException const& e) { return IRErrorCode_Decode_Error; } return IRErrorCode_Success; @@ -375,120 +336,63 @@ read_metadata_info(ReaderInterface& reader, encoded_tag_t& metadata_type, uint16 } template -static string decode_message( - string const& logtype, - vector const& encoded_vars, - vector const& dictionary_vars -) { - string message; - size_t encoded_vars_length = encoded_vars.size(); - size_t dict_vars_length = dictionary_vars.size(); - size_t next_static_text_begin_pos = 0; - - size_t dictionary_vars_ix = 0; - size_t encoded_vars_ix = 0; - for (size_t cur_pos = 0; cur_pos < logtype.length(); ++cur_pos) { - auto c = logtype[cur_pos]; - switch (c) { - case enum_to_underlying_type(VariablePlaceholder::Float): { - message.append( - logtype, - next_static_text_begin_pos, - cur_pos - next_static_text_begin_pos - ); - next_static_text_begin_pos = cur_pos + 1; - if (encoded_vars_ix >= encoded_vars_length) { - throw EncodingException( - ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - cTooFewEncodedVarsErrorMessage - ); - } - message.append(decode_float_var(encoded_vars[encoded_vars_ix])); - ++encoded_vars_ix; - - break; - } - - case enum_to_underlying_type(VariablePlaceholder::Integer): { - message.append( - logtype, - next_static_text_begin_pos, - cur_pos - next_static_text_begin_pos - ); - next_static_text_begin_pos = cur_pos + 1; - if (encoded_vars_ix >= encoded_vars_length) { - throw EncodingException( - ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - cTooFewEncodedVarsErrorMessage - ); - } - message.append(decode_integer_var(encoded_vars[encoded_vars_ix])); - ++encoded_vars_ix; - - break; - } +auto deserialize_ir_message( + ReaderInterface& reader, + string& logtype, + vector& encoded_vars, + vector& dict_vars, + epoch_time_ms_t& timestamp_or_timestamp_delta +) -> IRErrorCode { + encoded_tag_t encoded_tag{cProtocol::Eof}; + if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { + return IRErrorCode_Incomplete_IR; + } + if (cProtocol::Eof == encoded_tag) { + return IRErrorCode_Eof; + } - case enum_to_underlying_type(VariablePlaceholder::Dictionary): { - message.append( - logtype, - next_static_text_begin_pos, - cur_pos - next_static_text_begin_pos - ); - next_static_text_begin_pos = cur_pos + 1; - if (dictionary_vars_ix >= dict_vars_length) { - throw EncodingException( - ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - cTooFewDictionaryVarsErrorMessage - ); - } - message.append(dictionary_vars[dictionary_vars_ix]); - ++dictionary_vars_ix; - - break; + // Handle variables + string var_str; + bool is_encoded_var{false}; + while (is_variable_tag(encoded_tag, is_encoded_var)) { + if (is_encoded_var) { + encoded_variable_t encoded_variable; + if (false == decode_int(reader, encoded_variable)) { + return IRErrorCode_Incomplete_IR; } - - case cVariablePlaceholderEscapeCharacter: { - // Ensure the escape character is followed by a character that's - // being escaped - if (cur_pos == logtype.length() - 1) { - throw EncodingException( - ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - cUnexpectedEscapeCharacterMessage - ); - } - message.append( - logtype, - next_static_text_begin_pos, - cur_pos - next_static_text_begin_pos - ); - - // Skip the escape character - next_static_text_begin_pos = cur_pos + 1; - // The character after the escape character is static text - // (regardless of whether it is a variable placeholder), so - // increment cur_pos by 1 to ensure we don't process the next - // character in any of the other cases (instead it will be added - // to the message). - ++cur_pos; - - break; + encoded_vars.push_back(encoded_variable); + } else { + if (auto error_code = parse_dictionary_var(reader, encoded_tag, var_str); + IRErrorCode_Success != error_code) + { + return error_code; } + dict_vars.emplace_back(var_str); + } + if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { + return IRErrorCode_Incomplete_IR; } } - // Add remainder - if (next_static_text_begin_pos < logtype.length()) { - message.append(logtype, next_static_text_begin_pos); + + // Handle logtype + if (auto error_code = parse_logtype(reader, encoded_tag, logtype); + IRErrorCode_Success != error_code) + { + return error_code; } - return message; + // NOTE: for the eight-byte encoding, the timestamp is the actual timestamp; + // for the four-byte encoding, the timestamp is a timestamp delta + if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) { + return IRErrorCode_Incomplete_IR; + } + if (auto error_code + = parse_timestamp(reader, encoded_tag, timestamp_or_timestamp_delta); + IRErrorCode_Success != error_code) + { + return error_code; + } + return IRErrorCode_Success; } IRErrorCode get_encoding_type(ReaderInterface& reader, bool& is_four_bytes_encoding) { @@ -578,4 +482,21 @@ namespace eight_byte_encoding { ); } } // namespace eight_byte_encoding + +// Explicitly declare specializations +template auto deserialize_ir_message( + ReaderInterface& reader, + string& logtype, + vector& encoded_vars, + vector& dict_vars, + epoch_time_ms_t& timestamp_or_timestamp_delta +) -> IRErrorCode; + +template auto deserialize_ir_message( + ReaderInterface& reader, + string& logtype, + vector& encoded_vars, + vector& dict_vars, + epoch_time_ms_t& timestamp_or_timestamp_delta +) -> IRErrorCode; } // namespace ffi::ir_stream diff --git a/components/core/src/ffi/ir_stream/decoding_methods.hpp b/components/core/src/ffi/ir_stream/decoding_methods.hpp index 85577b52b..cf4264111 100644 --- a/components/core/src/ffi/ir_stream/decoding_methods.hpp +++ b/components/core/src/ffi/ir_stream/decoding_methods.hpp @@ -1,7 +1,7 @@ #ifndef FFI_IR_STREAM_DECODING_METHODS_HPP #define FFI_IR_STREAM_DECODING_METHODS_HPP -#include +#include #include #include "../../ReaderInterface.hpp" @@ -18,6 +18,25 @@ typedef enum { IRErrorCode_Incomplete_IR, } IRErrorCode; +class DecodingException : public TraceableException { +public: + // Constructors + DecodingException( + ErrorCode error_code, + char const* const filename, + int line_number, + std::string message + ) + : TraceableException(error_code, filename, line_number), + m_message(std::move(message)) {} + + // Methods + [[nodiscard]] char const* what() const noexcept override { return m_message.c_str(); } + +private: + std::string m_message; +}; + /** * Decodes the encoding type for the encoded IR stream * @param reader @@ -29,6 +48,66 @@ typedef enum { */ IRErrorCode get_encoding_type(ReaderInterface& reader, bool& is_four_bytes_encoding); +/** + * Deserializes an IR message from the given stream + * @tparam encoded_variable_t + * @param reader + * @param logtype Returns the logtype + * @param encoded_vars Returns the encoded variables + * @param dict_vars Returns the dictionary variables + * @param timestamp_or_timestamp_delta Returns the timestamp (in the eight-byte + * encoding case) or the timestamp delta (in the four-byte encoding case) + * @return IRErrorCode_Success on success + * @return IRErrorCode_Corrupted_IR if reader contains invalid IR + * @return IRErrorCode_Incomplete_IR if reader doesn't contain enough data + * @return IRErrorCode_Eof on reaching the end of the stream + */ +template +auto deserialize_ir_message( + ReaderInterface& reader, + std::string& logtype, + std::vector& encoded_vars, + std::vector& dict_vars, + epoch_time_ms_t& timestamp_or_timestamp_delta +) -> IRErrorCode; + +/** + * Decodes the IR message calls the given methods to handle each component of + * the message + * @tparam encoded_variable_t Type of the encoded variable + * @tparam ConstantHandler Method to handle constants in the logtype. + * Signature: (const std::string&, size_t, size_t) -> void + * @tparam EncodedIntHandler Method to handle encoded integers. + * Signature: (encoded_variable_t) -> void + * @tparam EncodedFloatHandler Method to handle encoded floats. + * Signature: (encoded_variable_t) -> void + * @tparam DictVarHandler Method to handle dictionary variables. + * Signature: (const std::string&) -> void + * @param logtype + * @param encoded_vars + * @param dict_vars + * @param constant_handler + * @param encoded_int_handler + * @param encoded_float_handler + * @param dict_var_handler + * @throw DecodingException if the message can not be decoded properly + */ +template < + typename encoded_variable_t, + typename ConstantHandler, + typename EncodedIntHandler, + typename EncodedFloatHandler, + typename DictVarHandler> +void generic_decode_message( + std::string const& logtype, + std::vector const& encoded_vars, + std::vector const& dict_vars, + ConstantHandler constant_handler, + EncodedIntHandler encoded_int_handler, + EncodedFloatHandler encoded_float_handler, + DictVarHandler dict_var_handler +); + /** * Decodes the preamble for an IR stream. * @param reader @@ -103,4 +182,6 @@ namespace four_byte_encoding { } // namespace four_byte_encoding } // namespace ffi::ir_stream +#include "decoding_methods.tpp" + #endif // FFI_IR_STREAM_DECODING_METHODS_HPP diff --git a/components/core/src/ffi/ir_stream/decoding_methods.tpp b/components/core/src/ffi/ir_stream/decoding_methods.tpp new file mode 100644 index 000000000..c02a933dc --- /dev/null +++ b/components/core/src/ffi/ir_stream/decoding_methods.tpp @@ -0,0 +1,141 @@ +#ifndef FFI_IR_STREAM_DECODING_METHODS_TPP +#define FFI_IR_STREAM_DECODING_METHODS_TPP + +#include +#include + +#include "../encoding_methods.hpp" +#include "decoding_methods.hpp" +#include "protocol_constants.hpp" + +namespace ffi::ir_stream { +template < + typename encoded_variable_t, + typename ConstantHandler, + typename EncodedIntHandler, + typename EncodedFloatHandler, + typename DictVarHandler> +void generic_decode_message( + std::string const& logtype, + std::vector const& encoded_vars, + std::vector const& dict_vars, + ConstantHandler constant_handler, + EncodedIntHandler encoded_int_handler, + EncodedFloatHandler encoded_float_handler, + DictVarHandler dict_var_handler +) { + auto const logtype_length = logtype.length(); + auto const encoded_vars_length = encoded_vars.size(); + auto const dict_vars_length = dict_vars.size(); + size_t next_static_text_begin_pos = 0; + + size_t dictionary_vars_ix = 0; + size_t encoded_vars_ix = 0; + for (size_t cur_pos = 0; cur_pos < logtype_length; ++cur_pos) { + auto c = logtype[cur_pos]; + switch (c) { + case enum_to_underlying_type(VariablePlaceholder::Float): { + constant_handler( + logtype, + next_static_text_begin_pos, + cur_pos - next_static_text_begin_pos + ); + next_static_text_begin_pos = cur_pos + 1; + if (encoded_vars_ix >= encoded_vars_length) { + throw DecodingException( + ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + cTooFewEncodedVarsErrorMessage + ); + } + encoded_float_handler(encoded_vars[encoded_vars_ix]); + ++encoded_vars_ix; + + break; + } + + case enum_to_underlying_type(VariablePlaceholder::Integer): { + constant_handler( + logtype, + next_static_text_begin_pos, + cur_pos - next_static_text_begin_pos + ); + next_static_text_begin_pos = cur_pos + 1; + if (encoded_vars_ix >= encoded_vars_length) { + throw DecodingException( + ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + cTooFewEncodedVarsErrorMessage + ); + } + encoded_int_handler(encoded_vars[encoded_vars_ix]); + ++encoded_vars_ix; + + break; + } + + case enum_to_underlying_type(VariablePlaceholder::Dictionary): { + constant_handler( + logtype, + next_static_text_begin_pos, + cur_pos - next_static_text_begin_pos + ); + next_static_text_begin_pos = cur_pos + 1; + if (dictionary_vars_ix >= dict_vars_length) { + throw DecodingException( + ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + cTooFewDictionaryVarsErrorMessage + ); + } + dict_var_handler(dict_vars[dictionary_vars_ix]); + ++dictionary_vars_ix; + + break; + } + + case cVariablePlaceholderEscapeCharacter: { + // Ensure the escape character is followed by a + // character that's being escaped + if (cur_pos == logtype_length - 1) { + throw DecodingException( + ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + cUnexpectedEscapeCharacterMessage + ); + } + constant_handler( + logtype, + next_static_text_begin_pos, + cur_pos - next_static_text_begin_pos + ); + + // Skip the escape character + next_static_text_begin_pos = cur_pos + 1; + // The character after the escape character is static text + // (regardless of whether it is a variable placeholder), so + // increment cur_pos by 1 to ensure we don't process the + // next character in any of the other cases (instead it will + // be added to the message). + ++cur_pos; + + break; + } + } + } + // Add remainder + if (next_static_text_begin_pos < logtype_length) { + constant_handler( + logtype, + next_static_text_begin_pos, + logtype_length - next_static_text_begin_pos + ); + } +} +} // namespace ffi::ir_stream + +#endif // FFI_IR_STREAM_DECODING_METHODS_TPP diff --git a/components/core/src/ir/LogEvent.hpp b/components/core/src/ir/LogEvent.hpp new file mode 100644 index 000000000..c84eba2f8 --- /dev/null +++ b/components/core/src/ir/LogEvent.hpp @@ -0,0 +1,52 @@ +#ifndef IR_LOGEVENT_HPP +#define IR_LOGEVENT_HPP + +#include +#include + +#include "../Defs.h" +#include "../ffi/encoding_methods.hpp" + +namespace ir { +/** + * A class representing a log event encoded using CLP's IR + * @tparam encoded_variable_t The type of encoded variables in the event + */ +template +class LogEvent { +public: + // Constructors + LogEvent( + ffi::epoch_time_ms_t timestamp, + std::string logtype, + std::vector dict_vars, + std::vector encoded_vars + ) + : m_timestamp{timestamp}, + m_logtype{std::move(logtype)}, + m_dict_vars{std::move(dict_vars)}, + m_encoded_vars{std::move(encoded_vars)} {} + + // Methods + [[nodiscard]] auto get_timestamp() const -> ffi::epoch_time_ms_t { return m_timestamp; } + + [[nodiscard]] auto get_logtype() const -> std::string const& { return m_logtype; } + + [[nodiscard]] auto get_dict_vars() const -> std::vector const& { + return m_dict_vars; + } + + [[nodiscard]] auto get_encoded_vars() const -> std::vector const& { + return m_encoded_vars; + } + +private: + // Variables + ffi::epoch_time_ms_t m_timestamp; + std::string m_logtype; + std::vector m_dict_vars; + std::vector m_encoded_vars; +}; +} // namespace ir + +#endif // IR_LOGEVENT_HPP diff --git a/components/core/src/ir/LogEventDeserializer.cpp b/components/core/src/ir/LogEventDeserializer.cpp new file mode 100644 index 000000000..16ba71ba2 --- /dev/null +++ b/components/core/src/ir/LogEventDeserializer.cpp @@ -0,0 +1,118 @@ +#include "LogEventDeserializer.hpp" + +#include + +#include + +#include "../ffi/ir_stream/decoding_methods.hpp" + +namespace ir { +template +auto LogEventDeserializer::create(ReaderInterface& reader) + -> BOOST_OUTCOME_V2_NAMESPACE::std_result> { + ffi::ir_stream::encoded_tag_t metadata_type{0}; + std::vector metadata; + auto ir_error_code = ffi::ir_stream::decode_preamble(reader, metadata_type, metadata); + if (ffi::ir_stream::IRErrorCode_Success != ir_error_code) { + switch (ir_error_code) { + case ffi::ir_stream::IRErrorCode_Incomplete_IR: + return std::errc::result_out_of_range; + case ffi::ir_stream::IRErrorCode_Corrupted_IR: + default: + return std::errc::protocol_error; + } + } + + if (ffi::ir_stream::cProtocol::Metadata::EncodingJson != metadata_type) { + return std::errc::protocol_not_supported; + } + + // Parse metadata and validate version + auto metadata_json = nlohmann::json::parse(metadata, nullptr, false); + if (metadata_json.is_discarded()) { + return std::errc::protocol_error; + } + auto version_iter = metadata_json.find(ffi::ir_stream::cProtocol::Metadata::VersionKey); + if (metadata_json.end() == version_iter || false == version_iter->is_string()) { + return std::errc::protocol_error; + } + auto metadata_version = version_iter->get_ref(); + if (static_cast(ffi::ir_stream::cProtocol::Metadata::VersionValue) + != metadata_version) + { + return std::errc::protocol_not_supported; + } + + if constexpr (std::is_same_v) { + return LogEventDeserializer{reader}; + } + if constexpr (std::is_same_v) { + // Get reference timestamp + auto ref_timestamp_iter + = metadata_json.find(ffi::ir_stream::cProtocol::Metadata::ReferenceTimestampKey); + if (metadata_json.end() == ref_timestamp_iter || false == ref_timestamp_iter->is_string()) { + return std::errc::protocol_error; + } + auto ref_timestamp_str = ref_timestamp_iter->get_ref(); + ffi::epoch_time_ms_t ref_timestamp{}; + if (false == convert_string_to_int(ref_timestamp_str, ref_timestamp)) { + return std::errc::protocol_error; + } + + return LogEventDeserializer{reader, ref_timestamp}; + } +} + +template +auto LogEventDeserializer::deserialize_log_event() + -> BOOST_OUTCOME_V2_NAMESPACE::std_result> { + ffi::epoch_time_ms_t timestamp_or_timestamp_delta{}; + std::string logtype; + std::vector dict_vars; + std::vector encoded_vars; + + auto ir_error_code = ffi::ir_stream::deserialize_ir_message( + m_reader, + logtype, + encoded_vars, + dict_vars, + timestamp_or_timestamp_delta + ); + if (ffi::ir_stream::IRErrorCode_Success != ir_error_code) { + switch (ir_error_code) { + case ffi::ir_stream::IRErrorCode_Eof: + return std::errc::no_message_available; + case ffi::ir_stream::IRErrorCode_Incomplete_IR: + return std::errc::result_out_of_range; + case ffi::ir_stream::IRErrorCode_Corrupted_IR: + default: + return std::errc::protocol_error; + } + } + + ffi::epoch_time_ms_t timestamp{}; + if constexpr (std::is_same_v) { + timestamp = timestamp_or_timestamp_delta; + } else { // std::is_same_v + m_prev_msg_timestamp += timestamp_or_timestamp_delta; + timestamp = m_prev_msg_timestamp; + } + + return LogEvent{timestamp, logtype, dict_vars, encoded_vars}; +} + +// Explicitly declare template specializations so that we can define the +// template methods in this file +template auto +LogEventDeserializer::create(ReaderInterface& reader) + -> BOOST_OUTCOME_V2_NAMESPACE::std_result< + LogEventDeserializer>; +template auto +LogEventDeserializer::create(ReaderInterface& reader) + -> BOOST_OUTCOME_V2_NAMESPACE::std_result< + LogEventDeserializer>; +template auto LogEventDeserializer::deserialize_log_event() + -> BOOST_OUTCOME_V2_NAMESPACE::std_result>; +template auto LogEventDeserializer::deserialize_log_event() + -> BOOST_OUTCOME_V2_NAMESPACE::std_result>; +} // namespace ir diff --git a/components/core/src/ir/LogEventDeserializer.hpp b/components/core/src/ir/LogEventDeserializer.hpp new file mode 100644 index 000000000..e392f0157 --- /dev/null +++ b/components/core/src/ir/LogEventDeserializer.hpp @@ -0,0 +1,85 @@ +#ifndef IR_LOGEVENTDESERIALIZER_HPP +#define IR_LOGEVENTDESERIALIZER_HPP + +#include + +#include + +#include "../ffi/encoding_methods.hpp" +#include "../ReaderInterface.hpp" +#include "../TimestampPattern.hpp" +#include "../TraceableException.hpp" +#include "LogEvent.hpp" + +namespace ir { +/** + * Class for deserializing IR log events from an IR stream. + * + * TODO: We're currently returning std::errc error codes, but we should replace + * these with our own custom error codes (derived from std::error_code), ideally + * replacing IRErrorCode. + * @tparam encoded_variable_t Type of encoded variables in the stream + */ +template +class LogEventDeserializer { +public: + // Factory functions + /** + * Creates a log event deserializer for the given stream + * @param reader A reader for the IR stream + * @return A result containing the serializer or an error code indicating + * the failure: + * - std::errc::result_out_of_range if the IR stream is truncated + * - std::errc::protocol_error if the IR stream is corrupted + * - std::errc::protocol_not_supported if the IR stream contains an + * unsupported metadata format or uses an unsupported version + */ + static auto create(ReaderInterface& reader) + -> BOOST_OUTCOME_V2_NAMESPACE::std_result>; + + // Delete copy constructor and assignment + LogEventDeserializer(LogEventDeserializer const&) = delete; + auto operator=(LogEventDeserializer const&) -> LogEventDeserializer& = delete; + + // Define default move constructor and assignment + LogEventDeserializer(LogEventDeserializer&&) = default; + auto operator=(LogEventDeserializer&&) -> LogEventDeserializer& = default; + + ~LogEventDeserializer() = default; + + // Methods + [[nodiscard]] auto get_timestamp_pattern() const -> TimestampPattern const& { + return m_timestamp_pattern; + } + + /** + * Deserializes a log event from the stream + * @return A result containing the log event or an error code indicating + * the failure: + * - std::errc::no_message_available on reaching the end of the IR stream + * - std::errc::result_out_of_range if the IR stream is truncated + * - std::errc::result_out_of_range if the IR stream is corrupted + */ + [[nodiscard]] auto deserialize_log_event() + -> BOOST_OUTCOME_V2_NAMESPACE::std_result>; + +private: + // Constructors + explicit LogEventDeserializer(ReaderInterface& reader) : m_reader{reader} {} + + LogEventDeserializer(ReaderInterface& reader, ffi::epoch_time_ms_t ref_timestamp) + : m_reader{reader}, + m_prev_msg_timestamp{ref_timestamp} {} + + // Variables + TimestampPattern m_timestamp_pattern{0, "%Y-%m-%dT%H:%M:%S.%3"}; + [[no_unique_address]] std::conditional_t< + std::is_same_v, + ffi::epoch_time_ms_t, + EmptyType> + m_prev_msg_timestamp{}; + ReaderInterface& m_reader; +}; +} // namespace ir + +#endif // IR_LOGEVENTDESERIALIZER_HPP diff --git a/components/core/src/ir/utils.cpp b/components/core/src/ir/utils.cpp new file mode 100644 index 000000000..9f55b6678 --- /dev/null +++ b/components/core/src/ir/utils.cpp @@ -0,0 +1,13 @@ +#include "utils.hpp" + +#include "../BufferReader.hpp" +#include "../ffi/ir_stream/decoding_methods.hpp" + +namespace ir { +auto has_ir_stream_magic_number(std::string_view buf) -> bool { + BufferReader buf_reader{buf.data(), buf.size()}; + bool is_four_bytes_encoded{false}; + return ffi::ir_stream::IRErrorCode_Success + == ffi::ir_stream::get_encoding_type(buf_reader, is_four_bytes_encoded); +} +} // namespace ir diff --git a/components/core/src/ir/utils.hpp b/components/core/src/ir/utils.hpp new file mode 100644 index 000000000..8e48e44f4 --- /dev/null +++ b/components/core/src/ir/utils.hpp @@ -0,0 +1,15 @@ +#ifndef IR_UTILS_HPP +#define IR_UTILS_HPP + +#include + +namespace ir { +/** + * @param buf + * @return Whether the content in the buffer starts with one of the IR stream + * magic numbers + */ +auto has_ir_stream_magic_number(std::string_view buf) -> bool; +} // namespace ir + +#endif // IR_UTILS_HPP diff --git a/components/core/src/streaming_archive/writer/Archive.cpp b/components/core/src/streaming_archive/writer/Archive.cpp index ba09d4b29..415d599e4 100644 --- a/components/core/src/streaming_archive/writer/Archive.cpp +++ b/components/core/src/streaming_archive/writer/Archive.cpp @@ -259,14 +259,7 @@ namespace streaming_archive::writer { m_file->write_encoded_msg(timestamp, logtype_id, encoded_vars, var_ids, num_uncompressed_bytes); - // Update segment indices - if (m_file->has_ts_pattern()) { - m_logtype_ids_in_segment_for_files_with_timestamps.insert(logtype_id); - m_var_ids_in_segment_for_files_with_timestamps.insert_all(var_ids); - } else { - m_logtype_ids_for_file_with_unassigned_segment.insert(logtype_id); - m_var_ids_for_file_with_unassigned_segment.insert(var_ids.cbegin(), var_ids.cend()); - } + update_segment_indices(logtype_id, var_ids); } void Archive::write_msg_using_schema (compressor_frontend::Token*& uncompressed_msg, uint32_t uncompressed_msg_pos, const bool has_delimiter, @@ -370,23 +363,57 @@ namespace streaming_archive::writer { m_logtype_dict.add_entry(m_logtype_dict_entry, logtype_id); m_file->write_encoded_msg(timestamp, logtype_id, m_encoded_vars, m_var_ids, num_uncompressed_bytes); - // Update segment indices - if (m_file->has_ts_pattern()) { - m_logtype_ids_in_segment_for_files_with_timestamps.insert(logtype_id); - m_var_ids_in_segment_for_files_with_timestamps.insert_all(m_var_ids); - } else { - m_logtype_ids_for_file_with_unassigned_segment.insert(logtype_id); - m_var_ids_for_file_with_unassigned_segment.insert(m_var_ids.cbegin(), m_var_ids.cend()); - } + update_segment_indices(logtype_id, m_var_ids); } } + template + void Archive::write_log_event_ir(ir::LogEvent const& log_event) { + vector encoded_vars; + vector var_ids; + size_t original_num_bytes{0}; + EncodedVariableInterpreter::encode_and_add_to_dictionary( + log_event, + m_logtype_dict_entry, + m_var_dict, + encoded_vars, + var_ids, + original_num_bytes + ); + + logtype_dictionary_id_t logtype_id{cLogtypeDictionaryIdMax}; + m_logtype_dict.add_entry(m_logtype_dict_entry, logtype_id); + + m_file->write_encoded_msg( + log_event.get_timestamp(), + logtype_id, + encoded_vars, + var_ids, + original_num_bytes + ); + + update_segment_indices(logtype_id, var_ids); + } + void Archive::write_dir_snapshot () { // Flush dictionaries m_logtype_dict.write_header_and_flush_to_disk(); m_var_dict.write_header_and_flush_to_disk(); } + void Archive::update_segment_indices( + logtype_dictionary_id_t logtype_id, + vector const& var_ids + ) { + if (m_file->has_ts_pattern()) { + m_logtype_ids_in_segment_for_files_with_timestamps.insert(logtype_id); + m_var_ids_in_segment_for_files_with_timestamps.insert_all(var_ids); + } else { + m_logtype_ids_for_file_with_unassigned_segment.insert(logtype_id); + m_var_ids_for_file_with_unassigned_segment.insert(var_ids.cbegin(), var_ids.cend()); + } + } + void Archive::append_file_contents_to_segment (Segment& segment, ArrayBackedPosIntSet& logtype_ids_in_segment, ArrayBackedPosIntSet& var_ids_in_segment, vector& files_in_segment) { @@ -522,4 +549,13 @@ namespace streaming_archive::writer { std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl; } } + + // Explicitly declare template specializations so that we can define the + // template methods in this file + template void Archive::write_log_event_ir( + ir::LogEvent const& log_event + ); + template void Archive::write_log_event_ir( + ir::LogEvent const& log_event + ); } diff --git a/components/core/src/streaming_archive/writer/Archive.hpp b/components/core/src/streaming_archive/writer/Archive.hpp index 8400061c5..64569a9f6 100644 --- a/components/core/src/streaming_archive/writer/Archive.hpp +++ b/components/core/src/streaming_archive/writer/Archive.hpp @@ -19,6 +19,7 @@ #include "../../compressor_frontend/Token.hpp" #include "../../ErrorCode.hpp" #include "../../GlobalMetadataDB.hpp" +#include "../../ir/LogEvent.hpp" #include "../../LogTypeDictionaryWriter.hpp" #include "../../VariableDictionaryWriter.hpp" #include "../ArchiveMetadata.hpp" @@ -140,6 +141,15 @@ namespace streaming_archive { namespace writer { */ void write_msg_using_schema (compressor_frontend::Token*& uncompressed_msg, uint32_t uncompressed_msg_pos, bool has_delimiter, bool has_timestamp); + /** + * Writes an IR log event to the current encoded file + * @tparam encoded_variable_t The type of the encoded variables in the + * log event + * @param log_event + */ + template + void write_log_event_ir(ir::LogEvent const& log_event); + /** * Writes snapshot of archive to disk including metadata of all files and new dictionary entries * @throw FileWriter::OperationFailed if failed to write or flush dictionaries @@ -200,6 +210,11 @@ namespace streaming_archive { namespace writer { }; // Methods + void update_segment_indices( + logtype_dictionary_id_t logtype_id, + std::vector const& var_ids + ); + /** * Appends the content of the current encoded file to the given segment * @param segment diff --git a/components/core/src/type_utils.hpp b/components/core/src/type_utils.hpp index 53f6e5742..b05f58524 100644 --- a/components/core/src/type_utils.hpp +++ b/components/core/src/type_utils.hpp @@ -5,6 +5,12 @@ #include #include +/** + * An empty type which can be used to declare variables conditionally based on + * template parameters + */ +struct EmptyType {}; + /** * Gets the underlying type of the given enum * @tparam T @@ -66,5 +72,4 @@ std::enable_if_t return reinterpret_cast(src); } - #endif // TYPE_UTILS_HPP diff --git a/components/core/submodules/boost-outcome b/components/core/submodules/boost-outcome new file mode 160000 index 000000000..39500a331 --- /dev/null +++ b/components/core/submodules/boost-outcome @@ -0,0 +1 @@ +Subproject commit 39500a33117c23596673c1925479c7ff01b602f6 diff --git a/components/core/tools/scripts/deps-download/boost-outcome.json b/components/core/tools/scripts/deps-download/boost-outcome.json new file mode 100644 index 000000000..01e89b394 --- /dev/null +++ b/components/core/tools/scripts/deps-download/boost-outcome.json @@ -0,0 +1,10 @@ +{ + "url": "https://github.com/boostorg/outcome/archive/refs/tags/boost-1.83.0.zip", + "unzip": true, + "targets": [ + { + "source": "outcome-boost-1.83.0", + "destination": "submodules/boost-outcome" + } + ] +} diff --git a/components/core/tools/scripts/deps-download/download-all.sh b/components/core/tools/scripts/deps-download/download-all.sh index 3a6688e5b..07c5a427b 100755 --- a/components/core/tools/scripts/deps-download/download-all.sh +++ b/components/core/tools/scripts/deps-download/download-all.sh @@ -21,6 +21,7 @@ python3 "${script_dir}/download-dep.py" "${script_dir}/antlr4.json" if [ -e "$project_root_dir/.git" ] ; then git submodule update --init --recursive else + python3 "${script_dir}/download-dep.py" "${script_dir}/boost-outcome.json" python3 "${script_dir}/download-dep.py" "${script_dir}/Catch2.json" python3 "${script_dir}/download-dep.py" "${script_dir}/date.json" python3 "${script_dir}/download-dep.py" "${script_dir}/json.json"