From 1830a62805891658b4184b9a932dec35b921a6ea Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 18 Nov 2024 12:27:44 +0000 Subject: [PATCH 01/17] Add a GET /node/snapshots endpoint --- include/ccf/node/startup_config.h | 15 ++++++-- src/common/configuration.h | 7 +++- src/host/configuration.h | 17 +------- src/host/main.cpp | 2 - src/host/snapshots.h | 64 +++++++++++++++++-------------- src/node/node_state.h | 2 +- src/node/rpc/node_frontend.h | 58 ++++++++++++++++++++++++++-- 7 files changed, 111 insertions(+), 54 deletions(-) diff --git a/include/ccf/node/startup_config.h b/include/ccf/node/startup_config.h index 69e28a0772c5..8b3355e320f0 100644 --- a/include/ccf/node/startup_config.h +++ b/include/ccf/node/startup_config.h @@ -74,15 +74,24 @@ struct CCFConfig bool operator==(const Attestation&) const = default; }; Attestation attestation = {}; + + struct Snapshots + { + std::string directory = "snapshots"; + size_t tx_count = 10'000; + std::optional read_only_directory = std::nullopt; + + bool operator==(const Snapshots&) const = default; + }; + Snapshots snapshots = {}; }; -struct StartupConfig : CCFConfig +struct StartupConfig : public CCFConfig { StartupConfig() = default; StartupConfig(const CCFConfig& common_base) : CCFConfig(common_base) {} std::string startup_host_time; - size_t snapshot_tx_interval = 10'000; // Only if starting or recovering size_t initial_service_certificate_validity_days = 1; @@ -118,4 +127,4 @@ struct StartupConfig : CCFConfig std::nullopt; }; Recover recover = {}; -}; \ No newline at end of file +}; diff --git a/src/common/configuration.h b/src/common/configuration.h index 7ea0eb9e5109..b4ecf9033cba 100644 --- a/src/common/configuration.h +++ b/src/common/configuration.h @@ -82,6 +82,11 @@ DECLARE_JSON_OPTIONAL_FIELDS( snp_security_policy_file, snp_uvm_endorsements_file); +DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots); +DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots); +DECLARE_JSON_OPTIONAL_FIELDS( + CCFConfig::Snapshots, directory, tx_count, read_only_directory); + DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig); DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network); DECLARE_JSON_OPTIONAL_FIELDS( @@ -92,6 +97,7 @@ DECLARE_JSON_OPTIONAL_FIELDS( ledger_signatures, jwt, attestation, + snapshots, node_to_node_message_limit, historical_cache_soft_limit); @@ -114,7 +120,6 @@ DECLARE_JSON_TYPE_WITH_BASE(StartupConfig, CCFConfig); DECLARE_JSON_REQUIRED_FIELDS( StartupConfig, startup_host_time, - snapshot_tx_interval, initial_service_certificate_validity_days, service_subject_name, cose_signatures, diff --git a/src/host/configuration.h b/src/host/configuration.h index 7f49aeed48a9..7324a8ef68f9 100644 --- a/src/host/configuration.h +++ b/src/host/configuration.h @@ -57,7 +57,7 @@ namespace host DECLARE_JSON_OPTIONAL_FIELDS( ParsedMemberInfo, encryption_public_key_file, data_json_file); - struct CCHostConfig : CCFConfig + struct CCHostConfig : public CCFConfig { struct Enclave { @@ -103,16 +103,6 @@ namespace host }; Ledger ledger = {}; - struct Snapshots - { - std::string directory = "snapshots"; - size_t tx_count = 10'000; - std::optional read_only_directory = std::nullopt; - - bool operator==(const Snapshots&) const = default; - }; - Snapshots snapshots = {}; - struct Logging { LoggerLevel host_level = LoggerLevel::INFO; @@ -189,11 +179,6 @@ namespace host DECLARE_JSON_OPTIONAL_FIELDS( CCHostConfig::Ledger, directory, read_only_directories, chunk_size); - DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots); - DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots); - DECLARE_JSON_OPTIONAL_FIELDS( - CCHostConfig::Snapshots, directory, tx_count, read_only_directory); - DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging); DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format); diff --git a/src/host/main.cpp b/src/host/main.cpp index 469b233a5af4..c1533c1185d5 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -504,8 +504,6 @@ int main(int argc, char** argv) StartupConfig startup_config(config); - startup_config.snapshot_tx_interval = config.snapshots.tx_count; - if (startup_config.attestation.snp_security_policy_file.has_value()) { auto security_policy_file = diff --git a/src/host/snapshots.h b/src/host/snapshots.h index b1f8906d4ea7..9f7824bc91e8 100644 --- a/src/host/snapshots.h +++ b/src/host/snapshots.h @@ -169,6 +169,40 @@ namespace asynchost return latest_committed_snapshot_file_name; } + std::optional> + find_latest_committed_snapshot_in_directories( + const fs::path& main_directory, + const std::optional& read_only_directory) + { + // Keep track of latest snapshot file in both directories + size_t latest_idx = 0; + + std::optional read_only_latest_committed_snapshot = std::nullopt; + if (read_only_directory.has_value()) + { + read_only_latest_committed_snapshot = + find_latest_committed_snapshot_in_directory( + read_only_directory.value(), latest_idx); + } + + auto main_latest_committed_snapshot = + find_latest_committed_snapshot_in_directory(main_directory, latest_idx); + + if (main_latest_committed_snapshot.has_value()) + { + return std::make_pair( + main_directory, main_latest_committed_snapshot.value()); + } + else if (read_only_latest_committed_snapshot.has_value()) + { + return std::make_pair( + read_only_directory.value(), + read_only_latest_committed_snapshot.value()); + } + + return std::nullopt; + } + class SnapshotManager { private: @@ -311,34 +345,8 @@ namespace asynchost std::optional> find_latest_committed_snapshot() { - // Keep track of latest snapshot file in both directories - size_t latest_idx = 0; - - std::optional read_only_latest_committed_snapshot = - std::nullopt; - if (read_snapshot_dir.has_value()) - { - read_only_latest_committed_snapshot = - find_latest_committed_snapshot_in_directory( - read_snapshot_dir.value(), latest_idx); - } - - auto main_latest_committed_snapshot = - find_latest_committed_snapshot_in_directory(snapshot_dir, latest_idx); - - if (main_latest_committed_snapshot.has_value()) - { - return std::make_pair( - snapshot_dir, main_latest_committed_snapshot.value()); - } - else if (read_only_latest_committed_snapshot.has_value()) - { - return std::make_pair( - read_snapshot_dir.value(), - read_only_latest_committed_snapshot.value()); - } - - return std::nullopt; + return find_latest_committed_snapshot_in_directories( + snapshot_dir, read_snapshot_dir); } void register_message_handlers( diff --git a/src/node/node_state.h b/src/node/node_state.h index 20323ac29712..32f9edece226 100644 --- a/src/node/node_state.h +++ b/src/node/node_state.h @@ -2602,7 +2602,7 @@ namespace ccf throw std::logic_error("Snapshotter already initialised"); } snapshotter = std::make_shared( - writer_factory, network.tables, config.snapshot_tx_interval); + writer_factory, network.tables, config.snapshots.tx_count); } void read_ledger_entries(::consensus::Index from, ::consensus::Index to) diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index 5935654bf46a..73e51807e55c 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -14,9 +14,11 @@ #include "ccf/version.h" #include "crypto/certs.h" #include "crypto/csr.h" +#include "ds/files.h" #include "ds/std_formatters.h" #include "enclave/reconfiguration_type.h" #include "frontend.h" +#include "host/snapshots.h" #include "node/network_state.h" #include "node/rpc/jwt_management.h" #include "node/rpc/no_create_tx_claims_digest.cpp" @@ -1184,13 +1186,14 @@ namespace ccf ccf::errors::InternalError, "NodeConfigurationSubsystem is not available"); } + const auto& node_startup_config = + node_configuration_subsystem->get().node_config; return make_success(GetNode::Out{ node_id, ccf::NodeStatus::PENDING, is_primary, - node_configuration_subsystem->get() - .node_config.network.rpc_interfaces, - node_configuration_subsystem->get().node_config.node_data, + node_startup_config.network.rpc_interfaces, + node_startup_config.node_data, 0}); } }; @@ -1786,6 +1789,55 @@ namespace ccf .set_auto_schema() .set_forwarding_required(endpoints::ForwardingRequired::Never) .install(); + + auto get_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { + auto node_configuration_subsystem = + this->context.get_subsystem(); + if (!node_configuration_subsystem) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "NodeConfigurationSubsystem is not available"); + return; + } + + const auto& snapshots_config = + node_configuration_subsystem->get().node_config.snapshots; + + auto latest_committed_snapshot = + asynchost::find_latest_committed_snapshot_in_directories( + snapshots_config.directory, snapshots_config.read_only_directory); + + if (!latest_committed_snapshot.has_value()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_NOT_FOUND, + ccf::errors::ResourceNotFound, + "This node has no committed snapshots"); + return; + } + + auto& [snapshot_dir, snapshot_path] = latest_committed_snapshot.value(); + + LOG_DEBUG_FMT("Found snapshot: {}", snapshot_dir / snapshot_path); + + ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); + + ctx.rpc_ctx->set_response_header( + ccf::http::headers::CONTENT_TYPE, + ccf::http::headervalues::contenttype::OCTET_STREAM); + ctx.rpc_ctx->set_response_header( + "x-ms-ccf-snapshot-filename", snapshot_path.string()); + + ctx.rpc_ctx->set_response_body( + files::slurp(snapshot_dir / snapshot_path)); + }; + make_command_endpoint( + "/snapshot", HTTP_GET, get_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); } }; From e94194916b87f8c1ab5ac044301e6bacbdf91f32 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 18 Nov 2024 12:47:01 +0000 Subject: [PATCH 02/17] Create new snapshots directory --- src/host/main.cpp | 4 +- src/host/test/ledger.cpp | 5 +- src/node/rpc/node_frontend.h | 4 +- .../snapshots.h => snapshots/filenames.h} | 190 +---------------- src/snapshots/snapshot_manager.h | 197 ++++++++++++++++++ 5 files changed, 208 insertions(+), 192 deletions(-) rename src/{host/snapshots.h => snapshots/filenames.h} (50%) create mode 100644 src/snapshots/snapshot_manager.h diff --git a/src/host/main.cpp b/src/host/main.cpp index c1533c1185d5..8b8b844dbb8d 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -24,7 +24,7 @@ #include "process_launcher.h" #include "rpc_connections.h" #include "sig_term.h" -#include "snapshots.h" +#include "snapshots/snapshot_manager.h" #include "ticker.h" #include "time_updater.h" @@ -373,7 +373,7 @@ int main(int argc, char** argv) config.ledger.read_only_directories); ledger.register_message_handlers(bp.get_dispatcher()); - asynchost::SnapshotManager snapshots( + snapshots::SnapshotManager snapshots( config.snapshots.directory, writer_factory, config.snapshots.read_only_directory); diff --git a/src/host/test/ledger.cpp b/src/host/test/ledger.cpp index f0320a0654c5..9fb8ebd24f06 100644 --- a/src/host/test/ledger.cpp +++ b/src/host/test/ledger.cpp @@ -7,8 +7,8 @@ #include "crypto/openssl/hash.h" #include "ds/files.h" #include "ds/serialized.h" -#include "host/snapshots.h" #include "kv/serialised_entry_format.h" +#include "snapshots/snapshot_manager.h" #define DOCTEST_CONFIG_IMPLEMENT #include @@ -1259,6 +1259,8 @@ TEST_CASE("Snapshot file name" * doctest::test_suite("snapshot")) std::vector snapshot_idx_interval_ranges = { 10, 1000, 10000, std::numeric_limits::max() - 2}; + using namespace snapshots; + for (auto const& snapshot_idx_interval_range : snapshot_idx_interval_ranges) { std::uniform_int_distribution dist(1, snapshot_idx_interval_range); @@ -1304,6 +1306,7 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot")) auto snap_ro_dir = AutoDeleteFolder(snapshot_dir_read_only); fs::create_directory(snapshot_dir_read_only); + using namespace snapshots; SnapshotManager snapshots(snapshot_dir, wf, snapshot_dir_read_only); size_t snapshot_interval = 5; diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index 73e51807e55c..b45b5e5a7645 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -18,7 +18,6 @@ #include "ds/std_formatters.h" #include "enclave/reconfiguration_type.h" #include "frontend.h" -#include "host/snapshots.h" #include "node/network_state.h" #include "node/rpc/jwt_management.h" #include "node/rpc/no_create_tx_claims_digest.cpp" @@ -27,6 +26,7 @@ #include "node_interface.h" #include "service/internal_tables_access.h" #include "service/tables/previous_service_identity.h" +#include "snapshots/filenames.h" namespace ccf { @@ -1806,7 +1806,7 @@ namespace ccf node_configuration_subsystem->get().node_config.snapshots; auto latest_committed_snapshot = - asynchost::find_latest_committed_snapshot_in_directories( + snapshots::find_latest_committed_snapshot_in_directories( snapshots_config.directory, snapshots_config.read_only_directory); if (!latest_committed_snapshot.has_value()) diff --git a/src/host/snapshots.h b/src/snapshots/filenames.h similarity index 50% rename from src/host/snapshots.h rename to src/snapshots/filenames.h index 9f7824bc91e8..56dea2912d50 100644 --- a/src/host/snapshots.h +++ b/src/snapshots/filenames.h @@ -2,19 +2,13 @@ // Licensed under the Apache 2.0 License. #pragma once -#include "ccf/ds/nonstd.h" -#include "consensus/ledger_enclave_types.h" -#include "time_bound_logger.h" - -#include #include -#include -#include #include +#include namespace fs = std::filesystem; -namespace asynchost +namespace snapshots { static constexpr auto snapshot_file_prefix = "snapshot"; static constexpr auto snapshot_idx_delimiter = "_"; @@ -202,182 +196,4 @@ namespace asynchost return std::nullopt; } - - class SnapshotManager - { - private: - ringbuffer::WriterPtr to_enclave; - - const fs::path snapshot_dir; - const std::optional read_snapshot_dir = std::nullopt; - - struct PendingSnapshot - { - ::consensus::Index evidence_idx; - std::shared_ptr> snapshot; - }; - std::map pending_snapshots; - - public: - SnapshotManager( - const std::string& snapshot_dir_, - ringbuffer::AbstractWriterFactory& writer_factory, - const std::optional& read_snapshot_dir_ = std::nullopt) : - to_enclave(writer_factory.create_writer_to_inside()), - snapshot_dir(snapshot_dir_), - read_snapshot_dir(read_snapshot_dir_) - { - if (fs::is_directory(snapshot_dir)) - { - LOG_INFO_FMT( - "Snapshots will be stored in existing directory: {}", snapshot_dir); - } - else if (!fs::create_directory(snapshot_dir)) - { - throw std::logic_error( - fmt::format("Could not create snapshot directory: {}", snapshot_dir)); - } - - if ( - read_snapshot_dir.has_value() && - !fs::is_directory(read_snapshot_dir.value())) - { - throw std::logic_error(fmt::format( - "{} read-only snapshot is not a directory", - read_snapshot_dir.value())); - } - } - - fs::path get_main_directory() const - { - return snapshot_dir; - } - - std::shared_ptr> add_pending_snapshot( - ::consensus::Index idx, - ::consensus::Index evidence_idx, - size_t requested_size) - { - auto snapshot = std::make_shared>(requested_size); - pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot}); - - LOG_DEBUG_FMT( - "Added pending snapshot {} [{} bytes]", idx, requested_size); - - return snapshot; - } - - void commit_snapshot( - ::consensus::Index snapshot_idx, - const uint8_t* receipt_data, - size_t receipt_size) - { - TimeBoundLogger log_if_slow( - fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx)); - - try - { - for (auto it = pending_snapshots.begin(); it != pending_snapshots.end(); - it++) - { - if (snapshot_idx == it->first) - { - // e.g. snapshot_100_105.committed - auto file_name = fmt::format( - "{}{}{}{}{}{}", - snapshot_file_prefix, - snapshot_idx_delimiter, - it->first, - snapshot_idx_delimiter, - it->second.evidence_idx, - snapshot_committed_suffix); - auto full_snapshot_path = snapshot_dir / file_name; - - if (fs::exists(full_snapshot_path)) - { - // In the case that a file with this name already exists, keep - // existing file and drop pending snapshot - LOG_FAIL_FMT( - "Cannot write snapshot as file already exists: {}", file_name); - } - else - { - std::ofstream snapshot_file( - full_snapshot_path, std::ios::app | std::ios::binary); - if (!snapshot_file.good()) - { - LOG_FAIL_FMT( - "Cannot write snapshot: error opening file {}", file_name); - } - else - { - const auto& snapshot = it->second.snapshot; - snapshot_file.write( - reinterpret_cast(snapshot->data()), - snapshot->size()); - snapshot_file.write( - reinterpret_cast(receipt_data), receipt_size); - - LOG_INFO_FMT( - "New snapshot file written to {} [{} bytes]", - file_name, - static_cast(snapshot_file.tellp())); - } - } - - pending_snapshots.erase(it); - - return; - } - } - - LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx); - } - catch (std::exception& e) - { - LOG_FAIL_FMT( - "Exception while attempting to commit snapshot at {}: {}", - snapshot_idx, - e.what()); - } - } - - std::optional> - find_latest_committed_snapshot() - { - return find_latest_committed_snapshot_in_directories( - snapshot_dir, read_snapshot_dir); - } - - void register_message_handlers( - messaging::Dispatcher& disp) - { - DISPATCHER_SET_MESSAGE_HANDLER( - disp, - ::consensus::snapshot_allocate, - [this](const uint8_t* data, size_t size) { - auto idx = serialized::read<::consensus::Index>(data, size); - auto evidence_idx = serialized::read<::consensus::Index>(data, size); - auto requested_size = serialized::read(data, size); - auto generation_count = serialized::read(data, size); - - auto snapshot = - add_pending_snapshot(idx, evidence_idx, requested_size); - - RINGBUFFER_WRITE_MESSAGE( - ::consensus::snapshot_allocated, - to_enclave, - std::span{snapshot->data(), snapshot->size()}, - generation_count); - }); - - DISPATCHER_SET_MESSAGE_HANDLER( - disp, - ::consensus::snapshot_commit, - [this](const uint8_t* data, size_t size) { - auto snapshot_idx = serialized::read<::consensus::Index>(data, size); - commit_snapshot(snapshot_idx, data, size); - }); - } - }; -} +} \ No newline at end of file diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h new file mode 100644 index 000000000000..bb65dca14683 --- /dev/null +++ b/src/snapshots/snapshot_manager.h @@ -0,0 +1,197 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/ds/nonstd.h" +#include "consensus/ledger_enclave_types.h" +#include "host/time_bound_logger.h" +#include "snapshots/filenames.h" + +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +namespace snapshots +{ + class SnapshotManager + { + private: + ringbuffer::WriterPtr to_enclave; + + const fs::path snapshot_dir; + const std::optional read_snapshot_dir = std::nullopt; + + struct PendingSnapshot + { + ::consensus::Index evidence_idx; + std::shared_ptr> snapshot; + }; + std::map pending_snapshots; + + public: + SnapshotManager( + const std::string& snapshot_dir_, + ringbuffer::AbstractWriterFactory& writer_factory, + const std::optional& read_snapshot_dir_ = std::nullopt) : + to_enclave(writer_factory.create_writer_to_inside()), + snapshot_dir(snapshot_dir_), + read_snapshot_dir(read_snapshot_dir_) + { + if (fs::is_directory(snapshot_dir)) + { + LOG_INFO_FMT( + "Snapshots will be stored in existing directory: {}", snapshot_dir); + } + else if (!fs::create_directory(snapshot_dir)) + { + throw std::logic_error( + fmt::format("Could not create snapshot directory: {}", snapshot_dir)); + } + + if ( + read_snapshot_dir.has_value() && + !fs::is_directory(read_snapshot_dir.value())) + { + throw std::logic_error(fmt::format( + "{} read-only snapshot is not a directory", + read_snapshot_dir.value())); + } + } + + fs::path get_main_directory() const + { + return snapshot_dir; + } + + std::shared_ptr> add_pending_snapshot( + ::consensus::Index idx, + ::consensus::Index evidence_idx, + size_t requested_size) + { + auto snapshot = std::make_shared>(requested_size); + pending_snapshots.emplace(idx, PendingSnapshot{evidence_idx, snapshot}); + + LOG_DEBUG_FMT( + "Added pending snapshot {} [{} bytes]", idx, requested_size); + + return snapshot; + } + + void commit_snapshot( + ::consensus::Index snapshot_idx, + const uint8_t* receipt_data, + size_t receipt_size) + { + asynchost::TimeBoundLogger log_if_slow( + fmt::format("Committing snapshot - snapshot_idx={}", snapshot_idx)); + + try + { + for (auto it = pending_snapshots.begin(); it != pending_snapshots.end(); + it++) + { + if (snapshot_idx == it->first) + { + // e.g. snapshot_100_105.committed + auto file_name = fmt::format( + "{}{}{}{}{}{}", + snapshot_file_prefix, + snapshot_idx_delimiter, + it->first, + snapshot_idx_delimiter, + it->second.evidence_idx, + snapshot_committed_suffix); + auto full_snapshot_path = snapshot_dir / file_name; + + if (fs::exists(full_snapshot_path)) + { + // In the case that a file with this name already exists, keep + // existing file and drop pending snapshot + LOG_FAIL_FMT( + "Cannot write snapshot as file already exists: {}", file_name); + } + else + { + std::ofstream snapshot_file( + full_snapshot_path, std::ios::app | std::ios::binary); + if (!snapshot_file.good()) + { + LOG_FAIL_FMT( + "Cannot write snapshot: error opening file {}", file_name); + } + else + { + const auto& snapshot = it->second.snapshot; + snapshot_file.write( + reinterpret_cast(snapshot->data()), + snapshot->size()); + snapshot_file.write( + reinterpret_cast(receipt_data), receipt_size); + + LOG_INFO_FMT( + "New snapshot file written to {} [{} bytes]", + file_name, + static_cast(snapshot_file.tellp())); + } + } + + pending_snapshots.erase(it); + + return; + } + } + + LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx); + } + catch (std::exception& e) + { + LOG_FAIL_FMT( + "Exception while attempting to commit snapshot at {}: {}", + snapshot_idx, + e.what()); + } + } + + std::optional> + find_latest_committed_snapshot() + { + return find_latest_committed_snapshot_in_directories( + snapshot_dir, read_snapshot_dir); + } + + void register_message_handlers( + messaging::Dispatcher& disp) + { + DISPATCHER_SET_MESSAGE_HANDLER( + disp, + ::consensus::snapshot_allocate, + [this](const uint8_t* data, size_t size) { + auto idx = serialized::read<::consensus::Index>(data, size); + auto evidence_idx = serialized::read<::consensus::Index>(data, size); + auto requested_size = serialized::read(data, size); + auto generation_count = serialized::read(data, size); + + auto snapshot = + add_pending_snapshot(idx, evidence_idx, requested_size); + + RINGBUFFER_WRITE_MESSAGE( + ::consensus::snapshot_allocated, + to_enclave, + std::span{snapshot->data(), snapshot->size()}, + generation_count); + }); + + DISPATCHER_SET_MESSAGE_HANDLER( + disp, + ::consensus::snapshot_commit, + [this](const uint8_t* data, size_t size) { + auto snapshot_idx = serialized::read<::consensus::Index>(data, size); + commit_snapshot(snapshot_idx, data, size); + }); + } + }; +} From e55961842b3a8ba64a09f33126a49a00248bebb1 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 20 Nov 2024 17:03:41 +0000 Subject: [PATCH 03/17] Redirect to a fixed-file endpoint, support Range header --- src/http/http_builder.h | 32 ++-- src/http/http_session.h | 7 +- src/node/rpc/node_frontend.h | 242 +++++++++++++++++++++++++++++-- src/snapshots/filenames.h | 34 ----- src/snapshots/snapshot_manager.h | 30 +++- 5 files changed, 287 insertions(+), 58 deletions(-) diff --git a/src/http/http_builder.h b/src/http/http_builder.h index 3ce285749152..ae0288aafb97 100644 --- a/src/http/http_builder.h +++ b/src/http/http_builder.h @@ -71,34 +71,48 @@ namespace http return body; } - void set_body(const std::vector* b) + void set_body( + const std::vector* b, bool overwrite_content_length = true) { if (b != nullptr) { - set_body(b->data(), b->size()); + set_body(b->data(), b->size(), overwrite_content_length); } else { - set_body(nullptr, 0); + set_body(nullptr, 0, overwrite_content_length); } } - void set_body(const uint8_t* b, size_t s) + void set_body( + const uint8_t* b, size_t s, bool overwrite_content_length = true) { + LOG_INFO_FMT("!!! Called set_body (raw)"); body = b; body_size = s; - headers[ccf::http::headers::CONTENT_LENGTH] = - fmt::format("{}", get_content_length()); + if ( + overwrite_content_length || + headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end()) + { + headers[ccf::http::headers::CONTENT_LENGTH] = + fmt::format("{}", get_content_length()); + } } - void set_body(const std::string& s) + void set_body(const std::string& s, bool overwrite_content_length = true) { + LOG_INFO_FMT("!!! Called set_body (string)"); body = (uint8_t*)s.data(); body_size = s.size(); - headers[ccf::http::headers::CONTENT_LENGTH] = - fmt::format("{}", get_content_length()); + if ( + overwrite_content_length || + headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end()) + { + headers[ccf::http::headers::CONTENT_LENGTH] = + fmt::format("{}", get_content_length()); + } } }; diff --git a/src/http/http_session.h b/src/http/http_session.h index dace8fb8d0fa..8e8b4ddbaa67 100644 --- a/src/http/http_session.h +++ b/src/http/http_session.h @@ -283,7 +283,12 @@ namespace http { response.set_header(k, v); } - response.set_body(body.data(), body.size()); + + response.set_body( + body.data(), + body.size(), + false /* Don't overwrite any existing content-length header */ + ); auto data = response.build_response(); tls_io->send_raw(data.data(), data.size()); diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index b45b5e5a7645..5f30075e8981 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -1790,7 +1790,8 @@ namespace ccf .set_forwarding_required(endpoints::ForwardingRequired::Never) .install(); - auto get_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { + // Redirects to endpoint for a single specific snapshot + auto find_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { auto node_configuration_subsystem = this->context.get_subsystem(); if (!node_configuration_subsystem) @@ -1805,9 +1806,10 @@ namespace ccf const auto& snapshots_config = node_configuration_subsystem->get().node_config.snapshots; + size_t latest_idx = 0; auto latest_committed_snapshot = - snapshots::find_latest_committed_snapshot_in_directories( - snapshots_config.directory, snapshots_config.read_only_directory); + snapshots::find_latest_committed_snapshot_in_directory( + snapshots_config.directory, latest_idx); if (!latest_committed_snapshot.has_value()) { @@ -1818,23 +1820,239 @@ namespace ccf return; } - auto& [snapshot_dir, snapshot_path] = latest_committed_snapshot.value(); + const auto& snapshot_path = latest_committed_snapshot.value(); - LOG_DEBUG_FMT("Found snapshot: {}", snapshot_dir / snapshot_path); + LOG_DEBUG_FMT("Redirecting to snapshot: {}", snapshot_path); - ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); + auto redirect_url = fmt::format("/node/snapshot/{}", snapshot_path); + ctx.rpc_ctx->set_response_header( + ccf::http::headers::LOCATION, redirect_url); + ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT); + }; + make_command_endpoint( + "/snapshot", HTTP_GET, find_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + + auto get_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { + auto node_configuration_subsystem = + this->context.get_subsystem(); + if (!node_configuration_subsystem) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_INTERNAL_SERVER_ERROR, + ccf::errors::InternalError, + "NodeConfigurationSubsystem is not available"); + return; + } + + const auto& snapshots_config = + node_configuration_subsystem->get().node_config.snapshots; + + std::string snapshot_name; + std::string error; + if (!get_path_param( + ctx.rpc_ctx->get_request_path_params(), + "snapshot_name", + snapshot_name, + error)) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidResourceName, + std::move(error)); + return; + } + + fs::path snapshot_path = + fs::path(snapshots_config.directory) / snapshot_name; + + std::ifstream f(snapshot_path); + if (!f.good()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_NOT_FOUND, + ccf::errors::ResourceNotFound, + fmt::format( + "This node does not have a snapshot named {}", snapshot_name)); + return; + } + + LOG_DEBUG_FMT("Found snapshot: {}", snapshot_path.string()); + + f.seekg(0, f.end); + const auto total_size = (size_t)f.tellg(); + + ctx.rpc_ctx->set_response_header("accept-ranges", "bytes"); + + if (ctx.rpc_ctx->get_request_verb() == HTTP_HEAD) + { + ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); + LOG_INFO_FMT("!!! Set explicit content length"); + ctx.rpc_ctx->set_response_header( + ccf::http::headers::CONTENT_LENGTH, total_size); + return; + } + + size_t range_start = 0; + size_t range_end = total_size; + { + const auto range_header = ctx.rpc_ctx->get_request_header("range"); + if (range_header.has_value()) + { + auto [unit, ranges] = + ccf::nonstd::split_1(range_header.value(), "="); + if (unit != "bytes") + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Only 'bytes' is supported as a Range header unit"); + return; + } + + if (ranges.find(",") != std::string::npos) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Multiple ranges are not supported"); + return; + } + + auto [s_range_start, s_range_end] = + ccf::nonstd::split_1(ranges, "-"); + + if (!s_range_start.empty()) + { + { + const auto [p, ec] = std::from_chars( + s_range_start.begin(), s_range_start.end(), range_start); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse start of range value {} in {}", + s_range_start, + range_header.value())); + return; + } + } + if (range_start > total_size) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Start of range {} is larger than total file size {}", + range_start, + total_size)); + return; + } + + if (!s_range_end.empty()) + { + // Fully-specified range, like "X-Y" + { + const auto [p, ec] = std::from_chars( + s_range_end.begin(), s_range_end.end(), range_end); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse end of range value {} in {}", + s_range_end, + range_header.value())); + return; + } + } + + if (range_end > total_size) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "End of range {} is larger than total file size {}", + range_end, + total_size)); + return; + } + + if (range_end < range_start) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Invalid range: Start ({}) and end ({}) out of order", + range_start, + range_end)); + return; + } + } + else + { + // Else this is an open-ended range like "X-" + range_end = total_size; + } + } + else + { + if (!s_range_end.empty()) + { + // Negative range, like "-Y" + size_t offset; + const auto [p, ec] = std::from_chars( + s_range_end.begin(), s_range_end.end(), offset); + if (ec != std::errc()) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Unable to parse end of range offset value {} in {}", + s_range_end, + range_header.value())); + return; + } + + range_end = total_size; + range_start = range_end - offset; + } + } + } + } + + // Range end is included + const auto range_size = range_end - range_start + 1; + + // Read requested range into buffer + std::vector contents(range_size); + f.seekg(range_start); + f.read((char*)contents.data(), contents.size()); + f.close(); + + // Build successful response + ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); ctx.rpc_ctx->set_response_header( ccf::http::headers::CONTENT_TYPE, ccf::http::headervalues::contenttype::OCTET_STREAM); - ctx.rpc_ctx->set_response_header( - "x-ms-ccf-snapshot-filename", snapshot_path.string()); - - ctx.rpc_ctx->set_response_body( - files::slurp(snapshot_dir / snapshot_path)); + ctx.rpc_ctx->set_response_body(std::move(contents)); }; make_command_endpoint( - "/snapshot", HTTP_GET, get_snapshot, no_auth_required) + "/snapshot/{snapshot_name}", HTTP_HEAD, get_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); + make_command_endpoint( + "/snapshot/{snapshot_name}", HTTP_GET, get_snapshot, no_auth_required) .set_forwarding_required(endpoints::ForwardingRequired::Never) .set_openapi_hidden(true) .install(); diff --git a/src/snapshots/filenames.h b/src/snapshots/filenames.h index 56dea2912d50..bba801d6e4b5 100644 --- a/src/snapshots/filenames.h +++ b/src/snapshots/filenames.h @@ -162,38 +162,4 @@ namespace snapshots return latest_committed_snapshot_file_name; } - - std::optional> - find_latest_committed_snapshot_in_directories( - const fs::path& main_directory, - const std::optional& read_only_directory) - { - // Keep track of latest snapshot file in both directories - size_t latest_idx = 0; - - std::optional read_only_latest_committed_snapshot = std::nullopt; - if (read_only_directory.has_value()) - { - read_only_latest_committed_snapshot = - find_latest_committed_snapshot_in_directory( - read_only_directory.value(), latest_idx); - } - - auto main_latest_committed_snapshot = - find_latest_committed_snapshot_in_directory(main_directory, latest_idx); - - if (main_latest_committed_snapshot.has_value()) - { - return std::make_pair( - main_directory, main_latest_committed_snapshot.value()); - } - else if (read_only_latest_committed_snapshot.has_value()) - { - return std::make_pair( - read_only_directory.value(), - read_only_latest_committed_snapshot.value()); - } - - return std::nullopt; - } } \ No newline at end of file diff --git a/src/snapshots/snapshot_manager.h b/src/snapshots/snapshot_manager.h index bb65dca14683..dfaa1d15a42d 100644 --- a/src/snapshots/snapshot_manager.h +++ b/src/snapshots/snapshot_manager.h @@ -159,8 +159,34 @@ namespace snapshots std::optional> find_latest_committed_snapshot() { - return find_latest_committed_snapshot_in_directories( - snapshot_dir, read_snapshot_dir); + // Keep track of latest snapshot file in both directories + size_t latest_idx = 0; + + std::optional read_only_latest_committed_snapshot = + std::nullopt; + if (read_snapshot_dir.has_value()) + { + read_only_latest_committed_snapshot = + find_latest_committed_snapshot_in_directory( + read_snapshot_dir.value(), latest_idx); + } + + auto main_latest_committed_snapshot = + find_latest_committed_snapshot_in_directory(snapshot_dir, latest_idx); + + if (main_latest_committed_snapshot.has_value()) + { + return std::make_pair( + snapshot_dir, main_latest_committed_snapshot.value()); + } + else if (read_only_latest_committed_snapshot.has_value()) + { + return std::make_pair( + read_snapshot_dir.value(), + read_only_latest_committed_snapshot.value()); + } + + return std::nullopt; } void register_message_handlers( From 292091f34e3cefb78ddc4a3ebb92cef3e6cbdf40 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 21 Nov 2024 15:31:32 +0000 Subject: [PATCH 04/17] Test snapshot endpoints, from client side --- python/src/ccf/ledger.py | 11 ++++ src/node/rpc/node_frontend.h | 43 +++++++++++--- tests/e2e_operations.py | 105 ++++++++++++++++++++++++++++++----- tests/infra/clients.py | 18 +++++- 4 files changed, 155 insertions(+), 22 deletions(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index 2b35b754e4e9..0eb8b20009d7 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -856,6 +856,17 @@ def get_len(self) -> int: return self._file_size +def latest_snapshot(snapshots_dir): + best_name, best_seqno = None, None + for s in os.listdir(snapshots_dir): + with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot: + snapshot_seqno = snapshot.get_public_domain().get_seqno() + if best_seqno is None or snapshot_seqno > best_seqno: + best_name = s + best_seqno = snapshot_seqno + return best_name + + class LedgerChunk: """ Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk. diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index 5f30075e8981..f2363013b2b9 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -1829,6 +1829,11 @@ namespace ccf ccf::http::headers::LOCATION, redirect_url); ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT); }; + make_command_endpoint( + "/snapshot", HTTP_HEAD, find_snapshot, no_auth_required) + .set_forwarding_required(endpoints::ForwardingRequired::Never) + .set_openapi_hidden(true) + .install(); make_command_endpoint( "/snapshot", HTTP_GET, find_snapshot, no_auth_required) .set_forwarding_required(endpoints::ForwardingRequired::Never) @@ -1868,7 +1873,7 @@ namespace ccf fs::path snapshot_path = fs::path(snapshots_config.directory) / snapshot_name; - std::ifstream f(snapshot_path); + std::ifstream f(snapshot_path, std::ios::binary); if (!f.good()) { ctx.rpc_ctx->set_error( @@ -1889,7 +1894,6 @@ namespace ccf if (ctx.rpc_ctx->get_request_verb() == HTTP_HEAD) { ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); - LOG_INFO_FMT("!!! Set explicit content length"); ctx.rpc_ctx->set_response_header( ccf::http::headers::CONTENT_LENGTH, total_size); return; @@ -1921,8 +1925,20 @@ namespace ccf return; } - auto [s_range_start, s_range_end] = - ccf::nonstd::split_1(ranges, "-"); + const auto segments = ccf::nonstd::split(ranges, "-"); + if (segments.size() != 2) + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + fmt::format( + "Invalid format, cannot parse range in {}", + range_header.value())); + return; + } + + const auto s_range_start = segments[0]; + const auto s_range_end = segments[1]; if (!s_range_start.empty()) { @@ -2026,12 +2042,19 @@ namespace ccf range_end = total_size; range_start = range_end - offset; } + else + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidHeaderValue, + "Invalid range: Must contain range-start or range-end"); + return; + } } } } - // Range end is included - const auto range_size = range_end - range_start + 1; + const auto range_size = range_end - range_start; // Read requested range into buffer std::vector contents(range_size); @@ -2040,11 +2063,17 @@ namespace ccf f.close(); // Build successful response - ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK); + ctx.rpc_ctx->set_response_status(HTTP_STATUS_PARTIAL_CONTENT); ctx.rpc_ctx->set_response_header( ccf::http::headers::CONTENT_TYPE, ccf::http::headervalues::contenttype::OCTET_STREAM); ctx.rpc_ctx->set_response_body(std::move(contents)); + + // Partial Content responses describe the current response in + // Content-Range + ctx.rpc_ctx->set_response_header( + "content-range", + fmt::format("bytes {}-{}/{}", range_start, range_end, total_size)); }; make_command_endpoint( "/snapshot/{snapshot_name}", HTTP_HEAD, get_snapshot, no_auth_required) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 7655432e00fb..31ea7160c5f4 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -209,6 +209,82 @@ def test_large_snapshot(network, args): ) +def test_snapshot_access(network, args): + primary, _ = network.find_primary() + + snapshots_dir = network.get_committed_snapshots(primary) + snapshot_name = ccf.ledger.latest_snapshot(snapshots_dir) + + with open(os.path.join(snapshots_dir, snapshot_name), "rb") as f: + snapshot_data = f.read() + + with primary.client() as c: + r = c.head("/node/snapshot", allow_redirects=False) + assert r.status_code == http.HTTPStatus.PERMANENT_REDIRECT.value, r + assert "location" in r.headers, r.headers + location = r.headers["location"] + assert location == f"/node/snapshot/{snapshot_name}" + LOG.warning(r.headers) + + r = c.head(location) + assert r.status_code == http.HTTPStatus.OK.value, r + assert r.headers["accept-ranges"] == "bytes", r.headers + total_size = int(r.headers["content-length"]) + + a = total_size // 3 + b = a * 2 + for start, end in [ + (0, None), + (0, total_size), + (0, a), + (a, a), + (a, b), + (b, b), + (b, total_size), + (b, None), + ]: + range_header_value = f"{start}-{'' if end is None else end}" + r = c.get(location, headers={"range": f"bytes={range_header_value}"}) + assert r.status_code == http.HTTPStatus.PARTIAL_CONTENT.value, r + + expected = snapshot_data[start:end] + actual = r.body.data() + assert ( + expected == actual + ), f"Binary mismatch, {len(expected)} vs {len(actual)}:\n{expected}\nvs\n{actual}" + + for negative_offset in [ + 1, + a, + b, + ]: + range_header_value = f"-{negative_offset}" + r = c.get(location, headers={"range": f"bytes={range_header_value}"}) + assert r.status_code == http.HTTPStatus.PARTIAL_CONTENT.value, r + + expected = snapshot_data[-negative_offset:] + actual = r.body.data() + assert ( + expected == actual + ), f"Binary mismatch, {len(expected)} vs {len(actual)}:\n{expected}\nvs\n{actual}" + + # Check error handling for invalid ranges + for invalid_range, err_msg in [ + (f"{a}-foo", "Unable to parse end of range value foo"), + (f"foo-foo", "Unable to parse start of range value foo"), + (f"foo-{b}", "Unable to parse start of range value foo"), + (f"{b}-{a}", "out of order"), + (f"0-{total_size + 1}", "larger than total file size"), + (f"-1-5", "Invalid format"), + (f"-", "Invalid range"), + (f"-foo", "Unable to parse end of range offset value foo"), + (f"", "Invalid format"), + ]: + r = c.get(location, headers={"range": f"bytes={invalid_range}"}) + assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r + assert err_msg in r.body.json()["error"]["message"], r + + def split_all_ledger_files_in_dir(input_dir, output_dir): # A ledger file can only be split at a seqno that contains a signature # (so that all files end on a signature that verifies their integrity). @@ -291,11 +367,13 @@ def run_file_operations(args): r = c.get("/node/network").body.json() assert r["service_data"] == service_data - test_save_committed_ledger_files(network, args) - test_parse_snapshot_file(network, args) - test_forced_ledger_chunk(network, args) + # TODO + # test_save_committed_ledger_files(network, args) + # test_parse_snapshot_file(network, args) + # test_forced_ledger_chunk(network, args) test_forced_snapshot(network, args) - test_large_snapshot(network, args) + # test_large_snapshot(network, args) + test_snapshot_access(network, args) primary, _ = network.find_primary() # Scoped transactions are not handled by historical range queries @@ -652,13 +730,14 @@ def run_cose_signatures_config_check(args): def run(args): - run_max_uncommitted_tx_count(args) + # TODO + # run_max_uncommitted_tx_count(args) run_file_operations(args) - run_tls_san_checks(args) - run_config_timeout_check(args) - run_configuration_file_checks(args) - run_pid_file_check(args) - run_preopen_readiness_check(args) - run_sighup_check(args) - run_service_subject_name_check(args) - run_cose_signatures_config_check(args) + # run_tls_san_checks(args) + # run_config_timeout_check(args) + # run_configuration_file_checks(args) + # run_pid_file_check(args) + # run_preopen_readiness_check(args) + # run_sighup_check(args) + # run_service_subject_name_check(args) + # run_cose_signatures_config_check(args) diff --git a/tests/infra/clients.py b/tests/infra/clients.py index 76247ef64548..6ac10b6d6dfa 100644 --- a/tests/infra/clients.py +++ b/tests/infra/clients.py @@ -162,7 +162,13 @@ def __str__(self): if self.headers: string += f" {truncate(str(self.headers), max_len=25)}" if self.body is not None: - string += escape_loguru_tags(f' {truncate(f"{self.body}")}') + if ( + "content-type" in self.headers + and self.headers["content-type"] == "application/octet-stream" + ): + string += f"" + else: + string += escape_loguru_tags(f' {truncate(f"{self.body}")}') return string @@ -267,7 +273,15 @@ def __str__(self): status_color = ( "red" if status_category in (4, 5) else "yellow" if redirect else "green" ) - body_s = escape_loguru_tags(truncate(str(self.body))) + + if ( + "content-type" in self.headers + and self.headers["content-type"] == "application/octet-stream" + ): + body_s = f"" + else: + body_s = escape_loguru_tags(truncate(str(self.body))) + # Body can't end with a \, or it will escape the loguru closing tag if len(body_s) > 0 and body_s[-1] == "\\": body_s += " " From 2494358dc55290c8d02930d625fa2ad0a85893a6 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 22 Nov 2024 16:26:13 +0000 Subject: [PATCH 05/17] Test nonstd.cpp! --- CMakeLists.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 65992691c8b3..de3b287194a9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -59,7 +59,7 @@ function(message) endfunction() option(PROFILE_TESTS "Profile tests" OFF) -set(PYTHON python3) +set(PYTHON unbuffer python3) set(DISTRIBUTE_PERF_TESTS "" @@ -251,7 +251,7 @@ endif() target_link_libraries( cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT} - ${LINK_LIBCXX} ccfcrypto.host + ${LINK_LIBCXX} ccfcrypto.host curl http_parser.host ) install(TARGETS cchost DESTINATION bin) @@ -742,6 +742,7 @@ if(BUILD_TESTS) ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp ) target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT}) From 99468017f1ed95ad86f2ac35a1899250621a603d Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 22 Nov 2024 16:26:30 +0000 Subject: [PATCH 06/17] Add nonstd::trim --- include/ccf/ds/nonstd.h | 8 ++++++++ src/ds/test/nonstd.cpp | 16 ++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/include/ccf/ds/nonstd.h b/include/ccf/ds/nonstd.h index 5788ce97deaf..c77148cfe6c2 100644 --- a/include/ccf/ds/nonstd.h +++ b/include/ccf/ds/nonstd.h @@ -185,6 +185,14 @@ namespace ccf::nonstd }); } + static inline std::string_view trim( + std::string_view s, std::string_view trim_chars = " \t\r\n") + { + const auto start = std::min(s.find_first_not_of(trim_chars), s.size()); + const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size()); + return s.substr(start, end - start); + } + /// Iterate through tuple, calling functor on each element template static void tuple_for_each(const std::tuple& t, const F& f) diff --git a/src/ds/test/nonstd.cpp b/src/ds/test/nonstd.cpp index b20c84716e95..8548df72dbac 100644 --- a/src/ds/test/nonstd.cpp +++ b/src/ds/test/nonstd.cpp @@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd")) } } } + +TEST_CASE("trim" * doctest::test_suite("nonstd")) +{ + REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world"); + REQUIRE( + ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") == + "hello world"); + REQUIRE(ccf::nonstd::trim("..hello..") == "..hello.."); + REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello"); + + REQUIRE(ccf::nonstd::trim("hello") == "hello"); + REQUIRE(ccf::nonstd::trim(" h") == "h"); + REQUIRE(ccf::nonstd::trim("h ") == "h"); + REQUIRE(ccf::nonstd::trim(" ") == ""); + REQUIRE(ccf::nonstd::trim("") == ""); +} \ No newline at end of file From 6bd65fd9dd54c8f2ee009dd776fcf7702db5b44b Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 27 Nov 2024 15:14:27 +0000 Subject: [PATCH 07/17] First-pass hack of actually fetching a snapshot, via curl, from a peer during startup --- src/host/main.cpp | 11 ++ src/http/http_builder.h | 2 - src/snapshots/fetch.h | 274 ++++++++++++++++++++++++++++++++++++++++ tests/e2e_operations.py | 4 + 4 files changed, 289 insertions(+), 2 deletions(-) create mode 100644 src/snapshots/fetch.h diff --git a/src/host/main.cpp b/src/host/main.cpp index 8b8b844dbb8d..ace4195c7b8f 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -24,6 +24,7 @@ #include "process_launcher.h" #include "rpc_connections.h" #include "sig_term.h" +#include "snapshots/fetch.h" #include "snapshots/snapshot_manager.h" #include "ticker.h" #include "time_updater.h" @@ -681,6 +682,16 @@ int main(int argc, char** argv) std::vector startup_snapshot = {}; + if (config.command.type == StartType::Join) + { + // TODO: Decide whether to use it + auto [snapshot_name, copied_snapshot] = + snapshots::fetch_from_peer(config.command.join.target_rpc_address); + + files::dump( + copied_snapshot, fmt::format("copy_of_{}.foo", snapshot_name)); + } + if ( config.command.type == StartType::Join || config.command.type == StartType::Recover) diff --git a/src/http/http_builder.h b/src/http/http_builder.h index ae0288aafb97..763aeeb188ad 100644 --- a/src/http/http_builder.h +++ b/src/http/http_builder.h @@ -87,7 +87,6 @@ namespace http void set_body( const uint8_t* b, size_t s, bool overwrite_content_length = true) { - LOG_INFO_FMT("!!! Called set_body (raw)"); body = b; body_size = s; @@ -102,7 +101,6 @@ namespace http void set_body(const std::string& s, bool overwrite_content_length = true) { - LOG_INFO_FMT("!!! Called set_body (string)"); body = (uint8_t*)s.data(); body_size = s.size(); diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h new file mode 100644 index 000000000000..070d6e9f4681 --- /dev/null +++ b/src/snapshots/fetch.h @@ -0,0 +1,274 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the Apache 2.0 License. +#pragma once + +#include "ccf/ds/logger.h" +#include "ccf/ds/nonstd.h" +#include "ccf/rest_verb.h" +#include "http/http_builder.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace snapshots +{ + // Using curl 7.68.0, so missing niceties like curl_easy_header + + using HeaderMap = std::unordered_map; + size_t append_header(char* buffer, size_t size, size_t nitems, void* userdata) + { + HeaderMap& headers = *(HeaderMap*)userdata; + + if (size != 1) + { + LOG_FAIL_FMT( + "Unexpected value in curl HEADERFUNCTION callback: size = {}", size); + return 0; + } + + const std::string_view header = + ccf::nonstd::trim(std::string_view(buffer, nitems)); + + // Ignore HTTP status line + if (!header.starts_with("HTTP/1.1")) + { + const auto [field, value] = ccf::nonstd::split_1(header, ": "); + if (!value.empty()) + { + headers[std::string(field)] = ccf::nonstd::trim(value); + } + else + { + LOG_INFO_FMT("Ignoring invalid-looking HTTP Header '{}'", header); + } + } + + return nitems * size; + } + + using BodyHandler = std::function&)>; + size_t curl_write_callback( + char* ptr, size_t size, size_t nmemb, void* user_data) + { + BodyHandler& body_handler = *(BodyHandler*)user_data; + + if (size != 1) + { + LOG_FAIL_FMT( + "Unexpected value in curl WRITEFUNCTION callback: size = {}", size); + return 0; + } + + std::span data((const uint8_t*)ptr, size * nmemb); + + body_handler(data); + + return size * nmemb; + } + + struct SimpleHTTPRequest + { + ccf::RESTVerb method; + std::string url; + HeaderMap headers; + BodyHandler body_handler = nullptr; + }; + + struct SimpleHTTPResponse + { + long status_code; + HeaderMap headers; + }; + + static inline SimpleHTTPResponse make_curl_request( + const SimpleHTTPRequest& request) + { + CURL* curl; + + curl = curl_easy_init(); + if (!curl) + { + throw std::runtime_error("Error initialising curl easy request"); + } + + curl_easy_setopt(curl, CURLOPT_URL, request.url.c_str()); + if (request.method == HTTP_HEAD) + { + curl_easy_setopt(curl, CURLOPT_NOBODY, 1L); + } + else if (request.method == HTTP_GET) + { + curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); + } + else + { + throw std::logic_error( + fmt::format("Unsupported HTTP method: {}", request.method.c_str())); + } + + SimpleHTTPResponse response; + curl_easy_setopt(curl, CURLOPT_HEADERDATA, &response.headers); + curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, append_header); + + // TODO: Should have a cert for them, right? + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + + struct curl_slist* list = nullptr; + for (const auto& [k, v] : request.headers) + { + list = curl_slist_append(list, fmt::format("{}: {}", k, v).c_str()); + } + + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list); + + if (request.body_handler != nullptr) + { + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &request.body_handler); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); + } + + LOG_INFO_FMT( + "!!! Sending curl request {} {}", request.method.c_str(), request.url); + + auto res = curl_easy_perform(curl); + + // TODO: Handle errors + LOG_INFO_FMT("!!! Curl perform result is {}", res); + + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.status_code); + + curl_slist_free_all(list); + curl_easy_cleanup(curl); + + return response; + } + + struct SnapshotResponse + { + std::string snapshot_name; + std::vector snapshot_data; + }; + + static SnapshotResponse fetch_from_peer(const std::string& peer_address) + { + const auto initial_url = + fmt::format("https://{}/node/snapshot", peer_address); + + SimpleHTTPRequest initial_request; + initial_request.method = HTTP_HEAD; + initial_request.url = initial_url; + const auto initial_response = make_curl_request(initial_request); + if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) + { + LOG_FAIL_FMT("TODO: Expected permanent redirect response"); + } + + auto location_it = initial_response.headers.find("location"); + if (location_it == initial_response.headers.end()) + { + LOG_FAIL_FMT("TODO: Missing Location header"); + } + + LOG_INFO_FMT("!!! Redirected to {}", location_it->second); + + const auto snapshot_url = + fmt::format("https://{}{}", peer_address, location_it->second); + + SimpleHTTPRequest snapshot_size_request; + snapshot_size_request.method = HTTP_HEAD; + snapshot_size_request.url = snapshot_url; + const auto snapshot_size_response = + make_curl_request(snapshot_size_request); + if (snapshot_size_response.status_code != HTTP_STATUS_OK) + { + LOG_FAIL_FMT("TODO: Expected OK response"); + } + + auto content_size_it = + snapshot_size_response.headers.find(ccf::http::headers::CONTENT_LENGTH); + if (content_size_it == snapshot_size_response.headers.end()) + { + LOG_FAIL_FMT("TODO: Missing content-size header"); + } + + LOG_INFO_FMT( + "!!! Parsing content size header: {}", content_size_it->second); + size_t content_size; + const auto& content_size_s = content_size_it->second; + const auto [p, ec] = std::from_chars( + content_size_s.data(), + content_size_s.data() + content_size_s.size(), + content_size); + if (ec != std::errc()) + { + LOG_FAIL_FMT("TODO: Invalid content size!?"); + } + + LOG_INFO_FMT("!!! Content size is {}", content_size); + + std::vector snapshot(content_size); + { + // TODO: Decide sensible chunk size, 4MB? + constexpr size_t range_size = 4 * 1024; + + auto range_start = 0; + auto range_end = std::min(content_size, range_size); + + while (true) + { + // TODO: Copy response body back + SimpleHTTPRequest snapshot_range_request; + snapshot_range_request.method = HTTP_GET; + snapshot_range_request.url = snapshot_url; + snapshot_range_request.headers["range"] = + fmt::format("bytes={}-{}", range_start, range_end); + + snapshot_range_request.body_handler = [&](const auto& data) { + const auto range_size = range_end - range_start; + if (data.size() != range_size) + { + LOG_FAIL_FMT( + "Asked for a range from {} to {} ({} bytes). Received a response " + "of {} bytes", + range_start, + range_end, + range_size, + data.size()); + } + + LOG_INFO_FMT( + "!!! Copying {} bytes into snapshot, starting at {}", + range_size, + range_start); + memcpy(snapshot.data() + range_start, data.data(), data.size()); + }; + + const auto range_response = make_curl_request(snapshot_range_request); + + if (range_response.status_code != HTTP_STATUS_PARTIAL_CONTENT) + { + LOG_FAIL_FMT( + "Got error HTTP response: {}", range_response.status_code); + } + + if (range_end == content_size) + { + break; + } + + range_start = range_end; + range_end = std::min(content_size, range_start + range_size); + } + } + + const auto url_components = ccf::nonstd::split(snapshot_url, "/"); + const std::string snapshot_name(url_components.back()); + + return SnapshotResponse{snapshot_name, std::move(snapshot)}; + } +} \ No newline at end of file diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 31ea7160c5f4..3e54d4056998 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -212,6 +212,10 @@ def test_large_snapshot(network, args): def test_snapshot_access(network, args): primary, _ = network.find_primary() + # TODO: Hacked in here to check copying + new_node = network.create_node("local://localhost") + network.join_node(new_node, args.package, args) + snapshots_dir = network.get_committed_snapshots(primary) snapshot_name = ccf.ledger.latest_snapshot(snapshots_dir) From 42af3a7f12d867dc1f9778d2db27513d3ef7ae76 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 10 Dec 2024 14:22:38 +0000 Subject: [PATCH 08/17] Interesting - careful about HEAD errors! --- src/http/http_rpc_context.h | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/http/http_rpc_context.h b/src/http/http_rpc_context.h index 6f663dd16691..b1b15f1c0927 100644 --- a/src/http/http_rpc_context.h +++ b/src/http/http_rpc_context.h @@ -215,19 +215,36 @@ namespace http return responder; } + template + void _set_response_body(T&& body) + { + // HEAD responses must not contain a body - clients will ignore it + if (verb != HTTP_HEAD) + { + if constexpr (std::is_same_v) + { + response_body = std::vector(body.begin(), body.end()); + } + else + { + response_body = std::forward(body); + } + } + } + virtual void set_response_body(const std::vector& body) override { - response_body = body; + _set_response_body(body); } virtual void set_response_body(std::vector&& body) override { - response_body = std::move(body); + _set_response_body(std::move(body)); } virtual void set_response_body(std::string&& body) override { - response_body = std::vector(body.begin(), body.end()); + _set_response_body(std::move(body)); } virtual const std::vector& get_response_body() const override From d9e9bedbfe096723cea10b2d64de48dceb0c9417 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 10 Dec 2024 14:23:05 +0000 Subject: [PATCH 09/17] Anyway: Add and test ?since parameter, to fetch minimum snapshot --- python/src/ccf/ledger.py | 13 +++++++++++++ src/node/rpc/node_frontend.h | 32 +++++++++++++++++++++++++++++++- tests/e2e_operations.py | 23 +++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/python/src/ccf/ledger.py b/python/src/ccf/ledger.py index 0eb8b20009d7..6c09e759ca5b 100644 --- a/python/src/ccf/ledger.py +++ b/python/src/ccf/ledger.py @@ -130,6 +130,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]: raise ValueError(f"Could not read seqno range from ledger file {filename}") +def snapshot_index_from_filename(filename: str) -> Tuple[int, int]: + elements = ( + os.path.basename(filename) + .replace(COMMITTED_FILE_SUFFIX, "") + .replace("snapshot_", "") + .split("_") + ) + if len(elements) == 2: + return (int(elements[0]), int(elements[1])) + else: + raise ValueError(f"Could not read snapshot index from file name {filename}") + + class GcmHeader: _gcm_tag = ["\0"] * GCM_SIZE_TAG _gcm_iv = ["\0"] * GCM_SIZE_IV diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index 5478972f818f..c660186b5570 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -1790,6 +1790,7 @@ namespace ccf .set_forwarding_required(endpoints::ForwardingRequired::Never) .install(); + static constexpr auto snapshot_since_param_key = "since"; // Redirects to endpoint for a single specific snapshot auto find_snapshot = [this](ccf::endpoints::CommandEndpointContext& ctx) { auto node_configuration_subsystem = @@ -1807,6 +1808,30 @@ namespace ccf node_configuration_subsystem->get().node_config.snapshots; size_t latest_idx = 0; + { + // Get latest_idx from query param, if present + const auto parsed_query = + http::parse_query(ctx.rpc_ctx->get_request_query()); + + std::string error_reason; + auto snapshot_since = http::get_query_value_opt( + parsed_query, snapshot_since_param_key, error_reason); + + if (snapshot_since.has_value()) + { + if (error_reason != "") + { + ctx.rpc_ctx->set_error( + HTTP_STATUS_BAD_REQUEST, + ccf::errors::InvalidQueryParameterValue, + std::move(error_reason)); + return; + } + latest_idx = snapshot_since.value(); + } + } + + const auto orig_latest = latest_idx; auto latest_committed_snapshot = snapshots::find_latest_committed_snapshot_in_directory( snapshots_config.directory, latest_idx); @@ -1816,7 +1841,8 @@ namespace ccf ctx.rpc_ctx->set_error( HTTP_STATUS_NOT_FOUND, ccf::errors::ResourceNotFound, - "This node has no committed snapshots"); + fmt::format( + "This node has no committed snapshots since {}", orig_latest)); return; } @@ -1832,11 +1858,15 @@ namespace ccf make_command_endpoint( "/snapshot", HTTP_HEAD, find_snapshot, no_auth_required) .set_forwarding_required(endpoints::ForwardingRequired::Never) + .add_query_parameter( + snapshot_since_param_key, ccf::endpoints::OptionalParameter) .set_openapi_hidden(true) .install(); make_command_endpoint( "/snapshot", HTTP_GET, find_snapshot, no_auth_required) .set_forwarding_required(endpoints::ForwardingRequired::Never) + .add_query_parameter( + snapshot_since_param_key, ccf::endpoints::OptionalParameter) .set_openapi_hidden(true) .install(); diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 3e54d4056998..11d0e0833707 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -218,6 +218,7 @@ def test_snapshot_access(network, args): snapshots_dir = network.get_committed_snapshots(primary) snapshot_name = ccf.ledger.latest_snapshot(snapshots_dir) + snapshot_index, _ = ccf.ledger.snapshot_index_from_filename(snapshot_name) with open(os.path.join(snapshots_dir, snapshot_name), "rb") as f: snapshot_data = f.read() @@ -230,6 +231,28 @@ def test_snapshot_access(network, args): assert location == f"/node/snapshot/{snapshot_name}" LOG.warning(r.headers) + for since, expected in ( + (0, location), + (1, location), + (snapshot_index // 2, location), + (snapshot_index - 1, location), + (snapshot_index, None), + (snapshot_index + 1, None), + ): + for method in ("GET", "HEAD"): + r = c.call( + f"/node/snapshot?since={since}", + allow_redirects=False, + http_verb=method, + ) + if expected is None: + assert r.status_code == http.HTTPStatus.NOT_FOUND, r + else: + assert r.status_code == http.HTTPStatus.PERMANENT_REDIRECT.value, r + assert "location" in r.headers, r.headers + actual = r.headers["location"] + assert actual == expected + r = c.head(location) assert r.status_code == http.HTTPStatus.OK.value, r assert r.headers["accept-ranges"] == "bytes", r.headers From 3473a62b55ab16ae2116c665d8163516a9f7775d Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 10 Dec 2024 15:02:35 +0000 Subject: [PATCH 10/17] Pass ?since when fetching, and only take newer snapshots --- src/host/main.cpp | 72 +++++++++++++++++++++++++++++-------------- src/snapshots/fetch.h | 14 ++++++--- 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/src/host/main.cpp b/src/host/main.cpp index a64d40c7384d..b9628f3f45b6 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -685,36 +685,62 @@ int main(int argc, char** argv) std::vector startup_snapshot = {}; - if (config.command.type == StartType::Join) - { - // TODO: Decide whether to use it - auto [snapshot_name, copied_snapshot] = - snapshots::fetch_from_peer(config.command.join.target_rpc_address); - - files::dump( - copied_snapshot, fmt::format("copy_of_{}.foo", snapshot_name)); - } - if ( config.command.type == StartType::Join || config.command.type == StartType::Recover) { - auto latest_committed_snapshot = - snapshots.find_latest_committed_snapshot(); - if (latest_committed_snapshot.has_value()) - { - auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value(); - startup_snapshot = files::slurp(snapshot_dir / snapshot_file); + auto latest_local_snapshot = snapshots.find_latest_committed_snapshot(); - LOG_INFO_FMT( - "Found latest snapshot file: {} (size: {})", - snapshot_dir / snapshot_file, - startup_snapshot.size()); + if (config.command.type == StartType::Join) + { + // Try to fetch a recent snapshot from peer + const size_t latest_local_idx = latest_local_snapshot.has_value() ? + snapshots::get_snapshot_idx_from_file_name( + latest_local_snapshot->second) : + 0; + auto latest_peer_snapshot = snapshots::fetch_from_peer( + config.command.join.target_rpc_address, latest_local_idx); + + if (latest_peer_snapshot.has_value()) + { + LOG_INFO_FMT( + "Received snapshot {} from peer (size: {}) - writing this to disk " + "and using for join startup", + latest_peer_snapshot->snapshot_name, + latest_peer_snapshot->snapshot_data.size()); + + const auto dst_path = fs::path(config.snapshots.directory) / + fs::path(latest_peer_snapshot->snapshot_name); + if (files::exists(dst_path)) + { + LOG_FATAL_FMT( + "Unable to write peer snapshot - already have a file at {}. " + "Exiting.", + dst_path); + return static_cast(CLI::ExitCodes::FileError); + } + files::dump(latest_peer_snapshot->snapshot_data, dst_path); + startup_snapshot = latest_peer_snapshot->snapshot_data; + } } - else + + if (startup_snapshot.empty()) { - LOG_INFO_FMT( - "No snapshot found: Node will replay all historical transactions"); + if (latest_local_snapshot.has_value()) + { + auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value(); + startup_snapshot = files::slurp(snapshot_dir / snapshot_file); + + LOG_INFO_FMT( + "Found latest local snapshot file: {} (size: {})", + snapshot_dir / snapshot_file, + startup_snapshot.size()); + } + else + { + LOG_INFO_FMT( + "No snapshot found: Node will replay all historical transactions"); + } } } diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h index 070d6e9f4681..fc038c429b53 100644 --- a/src/snapshots/fetch.h +++ b/src/snapshots/fetch.h @@ -154,16 +154,22 @@ namespace snapshots std::vector snapshot_data; }; - static SnapshotResponse fetch_from_peer(const std::string& peer_address) + static std::optional fetch_from_peer( + const std::string& peer_address, size_t latest_local_snapshot) { - const auto initial_url = - fmt::format("https://{}/node/snapshot", peer_address); + const auto initial_url = fmt::format( + "https://{}/node/snapshot?since={}", peer_address, latest_local_snapshot); SimpleHTTPRequest initial_request; initial_request.method = HTTP_HEAD; initial_request.url = initial_url; const auto initial_response = make_curl_request(initial_request); - if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) + if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) + { + LOG_INFO_FMT("Peer has no snapshot newer than {}", latest_local_snapshot); + return std::nullopt; + } + else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) { LOG_FAIL_FMT("TODO: Expected permanent redirect response"); } From b3bebe53c3a073f793de0151abbab188d8206fd1 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 10 Dec 2024 16:33:40 +0000 Subject: [PATCH 11/17] TODOs, return codes --- src/host/main.cpp | 4 +- src/snapshots/fetch.h | 230 +++++++++++++++++++++++++++--------------- 2 files changed, 149 insertions(+), 85 deletions(-) diff --git a/src/host/main.cpp b/src/host/main.cpp index b9628f3f45b6..a1e41e07c4ea 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -699,7 +699,9 @@ int main(int argc, char** argv) latest_local_snapshot->second) : 0; auto latest_peer_snapshot = snapshots::fetch_from_peer( - config.command.join.target_rpc_address, latest_local_idx); + config.command.join.target_rpc_address, + config.command.service_certificate_file, + latest_local_idx); if (latest_peer_snapshot.has_value()) { diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h index fc038c429b53..edbed2428ff4 100644 --- a/src/snapshots/fetch.h +++ b/src/snapshots/fetch.h @@ -15,6 +15,36 @@ #include #include +#define CHECK_CURL_EASY(fn, ...) \ + do \ + { \ + const auto res = fn(__VA_ARGS__); \ + if (res != CURLE_OK) \ + { \ + throw std::runtime_error(fmt::format( \ + "Error calling " #fn ": {} ({})", res, curl_easy_strerror(res))); \ + } \ + } while (0) + +#define CHECK_CURL_EASY_SETOPT(handle, info, arg) \ + CHECK_CURL_EASY(curl_easy_setopt, handle, info, arg) +#define CHECK_CURL_EASY_GETINFO(handle, info, arg) \ + CHECK_CURL_EASY(curl_easy_getinfo, handle, info, arg) + +#define EXPECT_HTTP_RESPONSE_STATUS(request, response, expected) \ + do \ + { \ + if (response.status_code != expected) \ + { \ + throw std::runtime_error(fmt::format( \ + "Expected {} response from {} {}, instead received {}", \ + ccf::http_status_str(expected), \ + request.method.c_str(), \ + request.url, \ + response.status_code)); \ + } \ + } while (0) + namespace snapshots { // Using curl 7.68.0, so missing niceties like curl_easy_header @@ -76,6 +106,7 @@ namespace snapshots ccf::RESTVerb method; std::string url; HeaderMap headers; + std::string ca_path; BodyHandler body_handler = nullptr; }; @@ -96,14 +127,14 @@ namespace snapshots throw std::runtime_error("Error initialising curl easy request"); } - curl_easy_setopt(curl, CURLOPT_URL, request.url.c_str()); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_URL, request.url.c_str()); if (request.method == HTTP_HEAD) { - curl_easy_setopt(curl, CURLOPT_NOBODY, 1L); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_NOBODY, 1L); } else if (request.method == HTTP_GET) { - curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HTTPGET, 1L); } else { @@ -112,35 +143,38 @@ namespace snapshots } SimpleHTTPResponse response; - curl_easy_setopt(curl, CURLOPT_HEADERDATA, &response.headers); - curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, append_header); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERDATA, &response.headers); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HEADERFUNCTION, append_header); - // TODO: Should have a cert for them, right? - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(curl, CURLOPT_CAINFO, request.ca_path.c_str()); - struct curl_slist* list = nullptr; + curl_slist* list = nullptr; for (const auto& [k, v] : request.headers) { list = curl_slist_append(list, fmt::format("{}: {}", k, v).c_str()); } - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_HTTPHEADER, list); if (request.body_handler != nullptr) { - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &request.body_handler); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEDATA, &request.body_handler); + CHECK_CURL_EASY_SETOPT(curl, CURLOPT_WRITEFUNCTION, curl_write_callback); } - LOG_INFO_FMT( - "!!! Sending curl request {} {}", request.method.c_str(), request.url); + LOG_TRACE_FMT( + "Sending curl request {} {}", request.method.c_str(), request.url); - auto res = curl_easy_perform(curl); + CHECK_CURL_EASY(curl_easy_perform, curl); - // TODO: Handle errors - LOG_INFO_FMT("!!! Curl perform result is {}", res); + CHECK_CURL_EASY_GETINFO( + curl, CURLINFO_RESPONSE_CODE, &response.status_code); - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response.status_code); + LOG_TRACE_FMT( + "{} {} returned {}", + request.method.c_str(), + request.url, + response.status_code); curl_slist_free_all(list); curl_easy_cleanup(curl); @@ -155,100 +189,131 @@ namespace snapshots }; static std::optional fetch_from_peer( - const std::string& peer_address, size_t latest_local_snapshot) + const std::string& peer_address, + const std::string& path_to_peer_cert, + size_t latest_local_snapshot) { - const auto initial_url = fmt::format( - "https://{}/node/snapshot?since={}", peer_address, latest_local_snapshot); - - SimpleHTTPRequest initial_request; - initial_request.method = HTTP_HEAD; - initial_request.url = initial_url; - const auto initial_response = make_curl_request(initial_request); - if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) - { - LOG_INFO_FMT("Peer has no snapshot newer than {}", latest_local_snapshot); - return std::nullopt; - } - else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) - { - LOG_FAIL_FMT("TODO: Expected permanent redirect response"); - } - - auto location_it = initial_response.headers.find("location"); - if (location_it == initial_response.headers.end()) + // Make initial request, which returns a redirect response to specific + // snapshot + std::string snapshot_url; { - LOG_FAIL_FMT("TODO: Missing Location header"); - } + const auto initial_url = fmt::format( + "https://{}/node/snapshot?since={}", + peer_address, + latest_local_snapshot); + + SimpleHTTPRequest initial_request; + initial_request.method = HTTP_HEAD; + initial_request.url = initial_url; + initial_request.ca_path = path_to_peer_cert; + + const auto initial_response = make_curl_request(initial_request); + if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) + { + LOG_INFO_FMT( + "Peer has no snapshot newer than {}", latest_local_snapshot); + return std::nullopt; + } + else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) + { + EXPECT_HTTP_RESPONSE_STATUS( + initial_request, initial_response, HTTP_STATUS_PERMANENT_REDIRECT); + } - LOG_INFO_FMT("!!! Redirected to {}", location_it->second); + auto location_it = + initial_response.headers.find(ccf::http::headers::LOCATION); + if (location_it == initial_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::LOCATION, + initial_request.method.c_str(), + initial_request.url)); + } - const auto snapshot_url = - fmt::format("https://{}{}", peer_address, location_it->second); + LOG_TRACE_FMT("Snapshot fetch redirected to {}", location_it->second); - SimpleHTTPRequest snapshot_size_request; - snapshot_size_request.method = HTTP_HEAD; - snapshot_size_request.url = snapshot_url; - const auto snapshot_size_response = - make_curl_request(snapshot_size_request); - if (snapshot_size_response.status_code != HTTP_STATUS_OK) - { - LOG_FAIL_FMT("TODO: Expected OK response"); + snapshot_url = + fmt::format("https://{}{}", peer_address, location_it->second); } - auto content_size_it = - snapshot_size_response.headers.find(ccf::http::headers::CONTENT_LENGTH); - if (content_size_it == snapshot_size_response.headers.end()) - { - LOG_FAIL_FMT("TODO: Missing content-size header"); - } - - LOG_INFO_FMT( - "!!! Parsing content size header: {}", content_size_it->second); + // Make follow-up request to redirected URL, to fetch total content size size_t content_size; - const auto& content_size_s = content_size_it->second; - const auto [p, ec] = std::from_chars( - content_size_s.data(), - content_size_s.data() + content_size_s.size(), - content_size); - if (ec != std::errc()) { - LOG_FAIL_FMT("TODO: Invalid content size!?"); + SimpleHTTPRequest snapshot_size_request; + snapshot_size_request.method = HTTP_HEAD; + snapshot_size_request.url = snapshot_url; + snapshot_size_request.ca_path = path_to_peer_cert; + + const auto snapshot_size_response = + make_curl_request(snapshot_size_request); + + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_size_request, snapshot_size_response, HTTP_STATUS_OK); + + auto content_size_it = + snapshot_size_response.headers.find(ccf::http::headers::CONTENT_LENGTH); + if (content_size_it == snapshot_size_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url)); + } + + const auto& content_size_s = content_size_it->second; + const auto [p, ec] = std::from_chars( + content_size_s.data(), + content_size_s.data() + content_size_s.size(), + content_size); + if (ec != std::errc()) + { + throw std::runtime_error(fmt::format( + "Invalid {} header in redirect response from {} {}: {}", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url, + ec)); + } } - LOG_INFO_FMT("!!! Content size is {}", content_size); + // Fetch 4MB chunks at a time + constexpr size_t range_size = 4 * 1024 * 1024; + LOG_TRACE_FMT( + "Preparing to fetch {}-byte snapshot from peer, {} bytes per-request", + content_size, + range_size); std::vector snapshot(content_size); - { - // TODO: Decide sensible chunk size, 4MB? - constexpr size_t range_size = 4 * 1024; + { auto range_start = 0; auto range_end = std::min(content_size, range_size); while (true) { - // TODO: Copy response body back SimpleHTTPRequest snapshot_range_request; snapshot_range_request.method = HTTP_GET; snapshot_range_request.url = snapshot_url; snapshot_range_request.headers["range"] = fmt::format("bytes={}-{}", range_start, range_end); + snapshot_range_request.ca_path = path_to_peer_cert; snapshot_range_request.body_handler = [&](const auto& data) { const auto range_size = range_end - range_start; if (data.size() != range_size) { - LOG_FAIL_FMT( - "Asked for a range from {} to {} ({} bytes). Received a response " - "of {} bytes", - range_start, - range_end, + throw std::runtime_error(fmt::format( + "Requested {} bytes from {} {}, received {} in response", range_size, - data.size()); + snapshot_range_request.method.c_str(), + snapshot_range_request.url, + data.size())); } - LOG_INFO_FMT( - "!!! Copying {} bytes into snapshot, starting at {}", + LOG_TRACE_FMT( + "Copying {} bytes into snapshot, starting at {}", range_size, range_start); memcpy(snapshot.data() + range_start, data.data(), data.size()); @@ -256,11 +321,8 @@ namespace snapshots const auto range_response = make_curl_request(snapshot_range_request); - if (range_response.status_code != HTTP_STATUS_PARTIAL_CONTENT) - { - LOG_FAIL_FMT( - "Got error HTTP response: {}", range_response.status_code); - } + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_range_request, range_response, HTTP_STATUS_PARTIAL_CONTENT); if (range_end == content_size) { From 2e788c2ce6420134a25c530439df8bc6e763bea7 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 11 Dec 2024 13:20:26 +0000 Subject: [PATCH 12/17] You may not get the whole response at once! --- src/snapshots/fetch.h | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h index edbed2428ff4..f59b14148978 100644 --- a/src/snapshots/fetch.h +++ b/src/snapshots/fetch.h @@ -301,22 +301,12 @@ namespace snapshots snapshot_range_request.ca_path = path_to_peer_cert; snapshot_range_request.body_handler = [&](const auto& data) { - const auto range_size = range_end - range_start; - if (data.size() != range_size) - { - throw std::runtime_error(fmt::format( - "Requested {} bytes from {} {}, received {} in response", - range_size, - snapshot_range_request.method.c_str(), - snapshot_range_request.url, - data.size())); - } - LOG_TRACE_FMT( "Copying {} bytes into snapshot, starting at {}", range_size, range_start); memcpy(snapshot.data() + range_start, data.data(), data.size()); + range_start += data.size(); }; const auto range_response = make_curl_request(snapshot_range_request); From fc51feec90b696747bd327cb92c58b8d968a968a Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 11 Dec 2024 13:20:59 +0000 Subject: [PATCH 13/17] Some more verbose debug logs --- src/node/rpc/node_frontend.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/node/rpc/node_frontend.h b/src/node/rpc/node_frontend.h index c660186b5570..b55eee365c29 100644 --- a/src/node/rpc/node_frontend.h +++ b/src/node/rpc/node_frontend.h @@ -1935,6 +1935,8 @@ namespace ccf const auto range_header = ctx.rpc_ctx->get_request_header("range"); if (range_header.has_value()) { + LOG_TRACE_FMT("Parsing range header {}", range_header.value()); + auto [unit, ranges] = ccf::nonstd::split_1(range_header.value(), "="); if (unit != "bytes") @@ -2086,6 +2088,12 @@ namespace ccf const auto range_size = range_end - range_start; + LOG_TRACE_FMT( + "Reading {}-byte range from {} to {}", + range_size, + range_start, + range_end); + // Read requested range into buffer std::vector contents(range_size); f.seekg(range_start); From b7b6fd74ddf228a82d87b9b8150839a6d4f02b51 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 11 Dec 2024 13:21:11 +0000 Subject: [PATCH 14/17] Restore tests --- tests/e2e_operations.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index 11d0e0833707..c1c01e63a253 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -212,10 +212,6 @@ def test_large_snapshot(network, args): def test_snapshot_access(network, args): primary, _ = network.find_primary() - # TODO: Hacked in here to check copying - new_node = network.create_node("local://localhost") - network.join_node(new_node, args.package, args) - snapshots_dir = network.get_committed_snapshots(primary) snapshot_name = ccf.ledger.latest_snapshot(snapshots_dir) snapshot_index, _ = ccf.ledger.snapshot_index_from_filename(snapshot_name) @@ -394,12 +390,11 @@ def run_file_operations(args): r = c.get("/node/network").body.json() assert r["service_data"] == service_data - # TODO - # test_save_committed_ledger_files(network, args) - # test_parse_snapshot_file(network, args) - # test_forced_ledger_chunk(network, args) + test_save_committed_ledger_files(network, args) + test_parse_snapshot_file(network, args) + test_forced_ledger_chunk(network, args) test_forced_snapshot(network, args) - # test_large_snapshot(network, args) + test_large_snapshot(network, args) test_snapshot_access(network, args) primary, _ = network.find_primary() @@ -757,14 +752,13 @@ def run_cose_signatures_config_check(args): def run(args): - # TODO - # run_max_uncommitted_tx_count(args) + run_max_uncommitted_tx_count(args) run_file_operations(args) - # run_tls_san_checks(args) - # run_config_timeout_check(args) - # run_configuration_file_checks(args) - # run_pid_file_check(args) - # run_preopen_readiness_check(args) - # run_sighup_check(args) - # run_service_subject_name_check(args) - # run_cose_signatures_config_check(args) + run_tls_san_checks(args) + run_config_timeout_check(args) + run_configuration_file_checks(args) + run_pid_file_check(args) + run_preopen_readiness_check(args) + run_sighup_check(args) + run_service_subject_name_check(args) + run_cose_signatures_config_check(args) From 05c5bc3975d321f2a286fa73cd8a0ffa4b115486 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 11 Dec 2024 15:24:47 +0000 Subject: [PATCH 15/17] Format --- CMakeLists.txt | 11 +++++++++-- tests/e2e_operations.py | 10 +++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 53da448365a5..ba9eba626a8e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,8 +255,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual") endif() target_link_libraries( - cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT} - ${LINK_LIBCXX} ccfcrypto.host curl http_parser.host + cchost + PRIVATE uv + ${TLS_LIBRARY} + ${CMAKE_DL_LIBS} + ${CMAKE_THREAD_LIBS_INIT} + ${LINK_LIBCXX} + ccfcrypto.host + curl + http_parser.host ) install(TARGETS cchost DESTINATION bin) diff --git a/tests/e2e_operations.py b/tests/e2e_operations.py index c1c01e63a253..48344d4138cf 100644 --- a/tests/e2e_operations.py +++ b/tests/e2e_operations.py @@ -294,14 +294,14 @@ def test_snapshot_access(network, args): # Check error handling for invalid ranges for invalid_range, err_msg in [ (f"{a}-foo", "Unable to parse end of range value foo"), - (f"foo-foo", "Unable to parse start of range value foo"), + ("foo-foo", "Unable to parse start of range value foo"), (f"foo-{b}", "Unable to parse start of range value foo"), (f"{b}-{a}", "out of order"), (f"0-{total_size + 1}", "larger than total file size"), - (f"-1-5", "Invalid format"), - (f"-", "Invalid range"), - (f"-foo", "Unable to parse end of range offset value foo"), - (f"", "Invalid format"), + ("-1-5", "Invalid format"), + ("-", "Invalid range"), + ("-foo", "Unable to parse end of range offset value foo"), + ("", "Invalid format"), ]: r = c.get(location, headers={"range": f"bytes={invalid_range}"}) assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r From 1d794a65421a2171312004e3414bbd1e2775dde7 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 11 Dec 2024 15:54:36 +0000 Subject: [PATCH 16/17] Install curl-devel on AzLinux --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f33072174a35..96bc8c9a1328 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,7 +114,7 @@ jobs: # Build tools tdnf -y install build-essential clang cmake ninja-build which # Dependencies - tdnf -y install openssl-devel libuv-devel + tdnf -y install openssl-devel libuv-devel curl-devel # Test dependencies tdnf -y install libarrow-devel parquet-libs-devel lldb shell: bash From 30d1c24e72aee6fd14f0dbf22a4dff33f5d9dfb5 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Fri, 13 Dec 2024 12:58:21 +0000 Subject: [PATCH 17/17] Configuration to disable fetching snapshot, for tests --- doc/host_config_schema/cchost_config.json | 5 + src/host/configuration.h | 6 +- src/host/main.cpp | 4 +- src/snapshots/fetch.h | 238 +++++++++++----------- tests/config.jinja | 3 +- tests/infra/remote.py | 2 + tests/reconfiguration.py | 15 +- 7 files changed, 153 insertions(+), 120 deletions(-) diff --git a/doc/host_config_schema/cchost_config.json b/doc/host_config_schema/cchost_config.json index f457ccbac250..4b06681c18b1 100644 --- a/doc/host_config_schema/cchost_config.json +++ b/doc/host_config_schema/cchost_config.json @@ -396,6 +396,11 @@ "type": "boolean", "default": true, "description": "Whether to follow redirects to the primary node of the existing service to join" + }, + "fetch_recent_snapshot": { + "type": "boolean", + "default": true, + "description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases" } }, "required": ["target_rpc_address"], diff --git a/src/host/configuration.h b/src/host/configuration.h index 7761236ee0c6..d6b9765b4fc9 100644 --- a/src/host/configuration.h +++ b/src/host/configuration.h @@ -145,6 +145,7 @@ namespace host ccf::NodeInfoNetwork::NetAddress target_rpc_address; ccf::ds::TimeString retry_timeout = {"1000ms"}; bool follow_redirect = true; + bool fetch_recent_snapshot = true; bool operator==(const Join&) const = default; }; @@ -201,7 +202,10 @@ namespace host DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address); DECLARE_JSON_OPTIONAL_FIELDS( - CCHostConfig::Command::Join, retry_timeout, follow_redirect); + CCHostConfig::Command::Join, + retry_timeout, + follow_redirect, + fetch_recent_snapshot); DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover); DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover); diff --git a/src/host/main.cpp b/src/host/main.cpp index a1e41e07c4ea..888cada048fc 100644 --- a/src/host/main.cpp +++ b/src/host/main.cpp @@ -691,7 +691,9 @@ int main(int argc, char** argv) { auto latest_local_snapshot = snapshots.find_latest_committed_snapshot(); - if (config.command.type == StartType::Join) + if ( + config.command.type == StartType::Join && + config.command.join.fetch_recent_snapshot) { // Try to fetch a recent snapshot from peer const size_t latest_local_idx = latest_local_snapshot.has_value() ? diff --git a/src/snapshots/fetch.h b/src/snapshots/fetch.h index f59b14148978..d3ee499c9cbf 100644 --- a/src/snapshots/fetch.h +++ b/src/snapshots/fetch.h @@ -193,140 +193,150 @@ namespace snapshots const std::string& path_to_peer_cert, size_t latest_local_snapshot) { - // Make initial request, which returns a redirect response to specific - // snapshot - std::string snapshot_url; + try { - const auto initial_url = fmt::format( - "https://{}/node/snapshot?since={}", - peer_address, - latest_local_snapshot); - - SimpleHTTPRequest initial_request; - initial_request.method = HTTP_HEAD; - initial_request.url = initial_url; - initial_request.ca_path = path_to_peer_cert; - - const auto initial_response = make_curl_request(initial_request); - if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) + // Make initial request, which returns a redirect response to specific + // snapshot + std::string snapshot_url; { - LOG_INFO_FMT( - "Peer has no snapshot newer than {}", latest_local_snapshot); - return std::nullopt; - } - else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) - { - EXPECT_HTTP_RESPONSE_STATUS( - initial_request, initial_response, HTTP_STATUS_PERMANENT_REDIRECT); - } + const auto initial_url = fmt::format( + "https://{}/node/snapshot?since={}", + peer_address, + latest_local_snapshot); + + SimpleHTTPRequest initial_request; + initial_request.method = HTTP_HEAD; + initial_request.url = initial_url; + initial_request.ca_path = path_to_peer_cert; + + const auto initial_response = make_curl_request(initial_request); + if (initial_response.status_code == HTTP_STATUS_NOT_FOUND) + { + LOG_INFO_FMT( + "Peer has no snapshot newer than {}", latest_local_snapshot); + return std::nullopt; + } + else if (initial_response.status_code != HTTP_STATUS_PERMANENT_REDIRECT) + { + EXPECT_HTTP_RESPONSE_STATUS( + initial_request, initial_response, HTTP_STATUS_PERMANENT_REDIRECT); + } - auto location_it = - initial_response.headers.find(ccf::http::headers::LOCATION); - if (location_it == initial_response.headers.end()) - { - throw std::runtime_error(fmt::format( - "Expected {} header in redirect response from {} {}, none found", - ccf::http::headers::LOCATION, - initial_request.method.c_str(), - initial_request.url)); - } + auto location_it = + initial_response.headers.find(ccf::http::headers::LOCATION); + if (location_it == initial_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::LOCATION, + initial_request.method.c_str(), + initial_request.url)); + } - LOG_TRACE_FMT("Snapshot fetch redirected to {}", location_it->second); + LOG_TRACE_FMT("Snapshot fetch redirected to {}", location_it->second); - snapshot_url = - fmt::format("https://{}{}", peer_address, location_it->second); - } + snapshot_url = + fmt::format("https://{}{}", peer_address, location_it->second); + } - // Make follow-up request to redirected URL, to fetch total content size - size_t content_size; - { - SimpleHTTPRequest snapshot_size_request; - snapshot_size_request.method = HTTP_HEAD; - snapshot_size_request.url = snapshot_url; - snapshot_size_request.ca_path = path_to_peer_cert; + // Make follow-up request to redirected URL, to fetch total content size + size_t content_size; + { + SimpleHTTPRequest snapshot_size_request; + snapshot_size_request.method = HTTP_HEAD; + snapshot_size_request.url = snapshot_url; + snapshot_size_request.ca_path = path_to_peer_cert; - const auto snapshot_size_response = - make_curl_request(snapshot_size_request); + const auto snapshot_size_response = + make_curl_request(snapshot_size_request); - EXPECT_HTTP_RESPONSE_STATUS( - snapshot_size_request, snapshot_size_response, HTTP_STATUS_OK); + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_size_request, snapshot_size_response, HTTP_STATUS_OK); - auto content_size_it = - snapshot_size_response.headers.find(ccf::http::headers::CONTENT_LENGTH); - if (content_size_it == snapshot_size_response.headers.end()) - { - throw std::runtime_error(fmt::format( - "Expected {} header in redirect response from {} {}, none found", - ccf::http::headers::CONTENT_LENGTH, - snapshot_size_request.method.c_str(), - snapshot_size_request.url)); - } + auto content_size_it = snapshot_size_response.headers.find( + ccf::http::headers::CONTENT_LENGTH); + if (content_size_it == snapshot_size_response.headers.end()) + { + throw std::runtime_error(fmt::format( + "Expected {} header in redirect response from {} {}, none found", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url)); + } - const auto& content_size_s = content_size_it->second; - const auto [p, ec] = std::from_chars( - content_size_s.data(), - content_size_s.data() + content_size_s.size(), - content_size); - if (ec != std::errc()) - { - throw std::runtime_error(fmt::format( - "Invalid {} header in redirect response from {} {}: {}", - ccf::http::headers::CONTENT_LENGTH, - snapshot_size_request.method.c_str(), - snapshot_size_request.url, - ec)); + const auto& content_size_s = content_size_it->second; + const auto [p, ec] = std::from_chars( + content_size_s.data(), + content_size_s.data() + content_size_s.size(), + content_size); + if (ec != std::errc()) + { + throw std::runtime_error(fmt::format( + "Invalid {} header in redirect response from {} {}: {}", + ccf::http::headers::CONTENT_LENGTH, + snapshot_size_request.method.c_str(), + snapshot_size_request.url, + ec)); + } } - } - // Fetch 4MB chunks at a time - constexpr size_t range_size = 4 * 1024 * 1024; - LOG_TRACE_FMT( - "Preparing to fetch {}-byte snapshot from peer, {} bytes per-request", - content_size, - range_size); + // Fetch 4MB chunks at a time + constexpr size_t range_size = 4 * 1024 * 1024; + LOG_TRACE_FMT( + "Preparing to fetch {}-byte snapshot from peer, {} bytes per-request", + content_size, + range_size); - std::vector snapshot(content_size); + std::vector snapshot(content_size); - { - auto range_start = 0; - auto range_end = std::min(content_size, range_size); - - while (true) { - SimpleHTTPRequest snapshot_range_request; - snapshot_range_request.method = HTTP_GET; - snapshot_range_request.url = snapshot_url; - snapshot_range_request.headers["range"] = - fmt::format("bytes={}-{}", range_start, range_end); - snapshot_range_request.ca_path = path_to_peer_cert; - - snapshot_range_request.body_handler = [&](const auto& data) { - LOG_TRACE_FMT( - "Copying {} bytes into snapshot, starting at {}", - range_size, - range_start); - memcpy(snapshot.data() + range_start, data.data(), data.size()); - range_start += data.size(); - }; - - const auto range_response = make_curl_request(snapshot_range_request); - - EXPECT_HTTP_RESPONSE_STATUS( - snapshot_range_request, range_response, HTTP_STATUS_PARTIAL_CONTENT); + auto range_start = 0; + auto range_end = std::min(content_size, range_size); - if (range_end == content_size) + while (true) { - break; + SimpleHTTPRequest snapshot_range_request; + snapshot_range_request.method = HTTP_GET; + snapshot_range_request.url = snapshot_url; + snapshot_range_request.headers["range"] = + fmt::format("bytes={}-{}", range_start, range_end); + snapshot_range_request.ca_path = path_to_peer_cert; + + snapshot_range_request.body_handler = [&](const auto& data) { + LOG_TRACE_FMT( + "Copying {} bytes into snapshot, starting at {}", + range_size, + range_start); + memcpy(snapshot.data() + range_start, data.data(), data.size()); + range_start += data.size(); + }; + + const auto range_response = make_curl_request(snapshot_range_request); + + EXPECT_HTTP_RESPONSE_STATUS( + snapshot_range_request, + range_response, + HTTP_STATUS_PARTIAL_CONTENT); + + if (range_end == content_size) + { + break; + } + + range_start = range_end; + range_end = std::min(content_size, range_start + range_size); } - - range_start = range_end; - range_end = std::min(content_size, range_start + range_size); } - } - const auto url_components = ccf::nonstd::split(snapshot_url, "/"); - const std::string snapshot_name(url_components.back()); + const auto url_components = ccf::nonstd::split(snapshot_url, "/"); + const std::string snapshot_name(url_components.back()); - return SnapshotResponse{snapshot_name, std::move(snapshot)}; + return SnapshotResponse{snapshot_name, std::move(snapshot)}; + } + catch (const std::exception& e) + { + LOG_FAIL_FMT("Error during snapshot fetch: {}", e.what()); + return std::nullopt; + } } } \ No newline at end of file diff --git a/tests/config.jinja b/tests/config.jinja index 65407e0befb6..c5dfbb0ef6d3 100644 --- a/tests/config.jinja +++ b/tests/config.jinja @@ -50,7 +50,8 @@ { "retry_timeout": "{{ join_timer }}", "target_rpc_address": "{{ target_rpc_address }}", - "follow_redirect": {{ follow_redirect|tojson }} + "follow_redirect": {{ follow_redirect|tojson }}, + "fetch_recent_snapshot": {{ fetch_recent_snapshot|tojson }} }, "recover": { "initial_service_certificate_validity_days": {{ initial_service_cert_validity_days }}, diff --git a/tests/infra/remote.py b/tests/infra/remote.py index e7522d0734ef..2bf52563363f 100644 --- a/tests/infra/remote.py +++ b/tests/infra/remote.py @@ -336,6 +336,7 @@ def __init__( ignore_first_sigterm=False, node_container_image=None, follow_redirect=True, + fetch_recent_snapshot=True, max_uncommitted_tx_count=0, snp_security_policy_file=None, snp_uvm_endorsements_file=None, @@ -533,6 +534,7 @@ def __init__( ignore_first_sigterm=ignore_first_sigterm, node_address=remote_class.get_node_address(node_address), follow_redirect=follow_redirect, + fetch_recent_snapshot=fetch_recent_snapshot, max_uncommitted_tx_count=max_uncommitted_tx_count, snp_security_policy_file=snp_security_policy_file, snp_uvm_endorsements_file=snp_uvm_endorsements_file, diff --git a/tests/reconfiguration.py b/tests/reconfiguration.py index c79b3550450e..95375878f770 100644 --- a/tests/reconfiguration.py +++ b/tests/reconfiguration.py @@ -124,7 +124,13 @@ def test_add_node(network, args, from_snapshot=True): } ) ) - network.join_node(new_node, args.package, args, from_snapshot=from_snapshot) + network.join_node( + new_node, + args.package, + args, + from_snapshot=from_snapshot, + fetch_recent_snapshot=from_snapshot, + ) # Verify self-signed node certificate validity period new_node.verify_certificate_validity_period(interface_name=operator_rpc_interface) @@ -138,9 +144,10 @@ def test_add_node(network, args, from_snapshot=True): if not from_snapshot: with new_node.client() as c: s = c.get("/node/state") - assert s.body.json()["node_id"] == new_node.node_id + body = s.body.json() + assert body["node_id"] == new_node.node_id assert ( - s.body.json()["startup_seqno"] == 0 + body["startup_seqno"] == 0 ), "Node started without snapshot but reports startup seqno != 0" # Now that the node is trusted, verify endorsed certificate validity period @@ -868,6 +875,7 @@ def run_join_old_snapshot(args): args.package, args, from_snapshot=True, + fetch_recent_snapshot=False, snapshots_dir=tmp_dir, timeout=3, ) @@ -891,6 +899,7 @@ def run_join_old_snapshot(args): args.package, args, from_snapshot=False, + fetch_recent_snapshot=False, timeout=3, ) except infra.network.StartupSeqnoIsOld as e: