Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch fresh snapshot from peer when joining #6700

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1830a62
Add a GET /node/snapshots endpoint
eddyashton Nov 18, 2024
e941949
Create new snapshots directory
eddyashton Nov 18, 2024
e559618
Redirect to a fixed-file endpoint, support Range header
eddyashton Nov 20, 2024
292091f
Test snapshot endpoints, from client side
eddyashton Nov 21, 2024
2494358
Test nonstd.cpp!
eddyashton Nov 22, 2024
9946801
Add nonstd::trim
eddyashton Nov 22, 2024
6bd65fd
First-pass hack of actually fetching a snapshot, via curl, from a pee…
eddyashton Nov 27, 2024
2589da8
Merge branch 'main' of github.com:microsoft/CCF into snapshots_endpoint
eddyashton Dec 10, 2024
42af3a7
Interesting - careful about HEAD errors!
eddyashton Dec 10, 2024
d9e9bed
Anyway: Add and test ?since parameter, to fetch minimum snapshot
eddyashton Dec 10, 2024
3473a62
Pass ?since when fetching, and only take newer snapshots
eddyashton Dec 10, 2024
b3bebe5
TODOs, return codes
eddyashton Dec 10, 2024
2e788c2
You may not get the whole response at once!
eddyashton Dec 11, 2024
fc51fee
Some more verbose debug logs
eddyashton Dec 11, 2024
b7b6fd7
Restore tests
eddyashton Dec 11, 2024
12b0c42
Merge branch 'main' of github.com:microsoft/CCF into snapshots_endpoint
eddyashton Dec 11, 2024
05c5bc3
Format
eddyashton Dec 11, 2024
1d794a6
Install curl-devel on AzLinux
eddyashton Dec 11, 2024
30d1c24
Configuration to disable fetching snapshot, for tests
eddyashton Dec 13, 2024
e09f35a
Merge branch 'main' of github.com:microsoft/CCF into snapshots_endpoint
eddyashton Dec 13, 2024
ad8edaa
Merge branch 'main' into snapshots_endpoint
eddyashton Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:
# Build tools
tdnf -y install build-essential clang cmake ninja-build which
# Dependencies
tdnf -y install openssl-devel libuv-devel
tdnf -y install openssl-devel libuv-devel curl-devel
# Test dependencies
tdnf -y install libarrow-devel parquet-libs-devel lldb
shell: bash
Expand Down
12 changes: 10 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,15 @@ elseif(COMPILE_TARGET STREQUAL "virtual")
endif()

target_link_libraries(
cchost PRIVATE uv ${TLS_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX} ccfcrypto.host
cchost
PRIVATE uv
${TLS_LIBRARY}
${CMAKE_DL_LIBS}
${CMAKE_THREAD_LIBS_INIT}
${LINK_LIBCXX}
ccfcrypto.host
curl
http_parser.host
)

install(TARGETS cchost DESTINATION bin)
Expand Down Expand Up @@ -747,6 +754,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/contiguous_set.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp
)
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

Expand Down
5 changes: 5 additions & 0 deletions doc/host_config_schema/cchost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@
"type": "boolean",
"default": true,
"description": "Whether to follow redirects to the primary node of the existing service to join"
},
"fetch_recent_snapshot": {
"type": "boolean",
"default": true,
"description": "Whether to ask the target for a newer snapshot before joining. The node will ask the target what their latest snapshot is, and if that is later than what the node has locally, will fetch it via RPC before launching. Should generally only be turned off for specific test cases"
}
},
"required": ["target_rpc_address"],
Expand Down
8 changes: 8 additions & 0 deletions include/ccf/ds/nonstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ namespace ccf::nonstd
});
}

static inline std::string_view trim(
std::string_view s, std::string_view trim_chars = " \t\r\n")
{
const auto start = std::min(s.find_first_not_of(trim_chars), s.size());
const auto end = std::min(s.find_last_not_of(trim_chars) + 1, s.size());
return s.substr(start, end - start);
}

/// Iterate through tuple, calling functor on each element
template <size_t I = 0, typename F, typename... Ts>
static void tuple_for_each(const std::tuple<Ts...>& t, const F& f)
Expand Down
10 changes: 10 additions & 0 deletions include/ccf/node/startup_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ namespace ccf
bool operator==(const Attestation&) const = default;
};
Attestation attestation = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};
};

struct StartupConfig : CCFConfig
Expand Down
24 changes: 24 additions & 0 deletions python/src/ccf/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ def range_from_filename(filename: str) -> Tuple[int, Optional[int]]:
raise ValueError(f"Could not read seqno range from ledger file {filename}")


def snapshot_index_from_filename(filename: str) -> Tuple[int, int]:
elements = (
os.path.basename(filename)
.replace(COMMITTED_FILE_SUFFIX, "")
.replace("snapshot_", "")
.split("_")
)
if len(elements) == 2:
return (int(elements[0]), int(elements[1]))
else:
raise ValueError(f"Could not read snapshot index from file name {filename}")


class GcmHeader:
_gcm_tag = ["\0"] * GCM_SIZE_TAG
_gcm_iv = ["\0"] * GCM_SIZE_IV
Expand Down Expand Up @@ -856,6 +869,17 @@ def get_len(self) -> int:
return self._file_size


def latest_snapshot(snapshots_dir):
best_name, best_seqno = None, None
for s in os.listdir(snapshots_dir):
with ccf.ledger.Snapshot(os.path.join(snapshots_dir, s)) as snapshot:
snapshot_seqno = snapshot.get_public_domain().get_seqno()
if best_seqno is None or snapshot_seqno > best_seqno:
best_name = s
best_seqno = snapshot_seqno
return best_name


class LedgerChunk:
"""
Class used to parse and iterate over :py:class:`ccf.ledger.Transaction` in a CCF ledger chunk.
Expand Down
6 changes: 6 additions & 0 deletions src/common/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ namespace ccf
snp_security_policy_file,
snp_uvm_endorsements_file);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCFConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCFConfig);
DECLARE_JSON_REQUIRED_FIELDS(CCFConfig, network);
DECLARE_JSON_OPTIONAL_FIELDS(
Expand All @@ -94,6 +99,7 @@ namespace ccf
ledger_signatures,
jwt,
attestation,
snapshots,
node_to_node_message_limit,
historical_cache_soft_limit);

Expand Down
16 changes: 16 additions & 0 deletions src/ds/test/nonstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,19 @@ TEST_CASE("rsplit" * doctest::test_suite("nonstd"))
}
}
}

TEST_CASE("trim" * doctest::test_suite("nonstd"))
{
REQUIRE(ccf::nonstd::trim(" hello world ") == "hello world");
REQUIRE(
ccf::nonstd::trim(" \r\n\t\nhello world\n\n\r\t\t\n\t \n\t") ==
"hello world");
REQUIRE(ccf::nonstd::trim("..hello..") == "..hello..");
REQUIRE(ccf::nonstd::trim("..hello..", ".") == "hello");

REQUIRE(ccf::nonstd::trim("hello") == "hello");
REQUIRE(ccf::nonstd::trim(" h") == "h");
REQUIRE(ccf::nonstd::trim("h ") == "h");
REQUIRE(ccf::nonstd::trim(" ") == "");
REQUIRE(ccf::nonstd::trim("") == "");
}
21 changes: 5 additions & 16 deletions src/host/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,6 @@ namespace host
};
Ledger ledger = {};

struct Snapshots
{
std::string directory = "snapshots";
size_t tx_count = 10'000;
std::optional<std::string> read_only_directory = std::nullopt;

bool operator==(const Snapshots&) const = default;
};
Snapshots snapshots = {};

struct Logging
{
ccf::LoggerLevel host_level = ccf::LoggerLevel::INFO;
Expand Down Expand Up @@ -155,6 +145,7 @@ namespace host
ccf::NodeInfoNetwork::NetAddress target_rpc_address;
ccf::ds::TimeString retry_timeout = {"1000ms"};
bool follow_redirect = true;
bool fetch_recent_snapshot = true;

bool operator==(const Join&) const = default;
};
Expand Down Expand Up @@ -189,11 +180,6 @@ namespace host
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Ledger, directory, read_only_directories, chunk_size);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Snapshots);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Snapshots, directory, tx_count, read_only_directory);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Logging);
DECLARE_JSON_OPTIONAL_FIELDS(CCHostConfig::Logging, host_level, format);
Expand All @@ -216,7 +202,10 @@ namespace host
DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Join);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Join, target_rpc_address);
DECLARE_JSON_OPTIONAL_FIELDS(
CCHostConfig::Command::Join, retry_timeout, follow_redirect);
CCHostConfig::Command::Join,
retry_timeout,
follow_redirect,
fetch_recent_snapshot);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(CCHostConfig::Command::Recover);
DECLARE_JSON_REQUIRED_FIELDS(CCHostConfig::Command::Recover);
Expand Down
73 changes: 56 additions & 17 deletions src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include "process_launcher.h"
#include "rpc_connections.h"
#include "sig_term.h"
#include "snapshots.h"
#include "snapshots/fetch.h"
#include "snapshots/snapshot_manager.h"
#include "ticker.h"
#include "time_updater.h"

Expand Down Expand Up @@ -376,7 +377,7 @@ int main(int argc, char** argv)
config.ledger.read_only_directories);
ledger.register_message_handlers(bp.get_dispatcher());

asynchost::SnapshotManager snapshots(
snapshots::SnapshotManager snapshots(
config.snapshots.directory,
writer_factory,
config.snapshots.read_only_directory);
Expand Down Expand Up @@ -507,8 +508,6 @@ int main(int argc, char** argv)

ccf::StartupConfig startup_config(config);

startup_config.snapshot_tx_interval = config.snapshots.tx_count;

if (startup_config.attestation.snp_security_policy_file.has_value())
{
auto security_policy_file =
Expand Down Expand Up @@ -690,22 +689,62 @@ int main(int argc, char** argv)
config.command.type == StartType::Join ||
config.command.type == StartType::Recover)
{
auto latest_committed_snapshot =
snapshots.find_latest_committed_snapshot();
if (latest_committed_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_committed_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);
auto latest_local_snapshot = snapshots.find_latest_committed_snapshot();

LOG_INFO_FMT(
"Found latest snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
if (
config.command.type == StartType::Join &&
config.command.join.fetch_recent_snapshot)
{
// Try to fetch a recent snapshot from peer
const size_t latest_local_idx = latest_local_snapshot.has_value() ?
snapshots::get_snapshot_idx_from_file_name(
latest_local_snapshot->second) :
0;
auto latest_peer_snapshot = snapshots::fetch_from_peer(
config.command.join.target_rpc_address,
config.command.service_certificate_file,
latest_local_idx);

if (latest_peer_snapshot.has_value())
{
LOG_INFO_FMT(
"Received snapshot {} from peer (size: {}) - writing this to disk "
"and using for join startup",
latest_peer_snapshot->snapshot_name,
latest_peer_snapshot->snapshot_data.size());

const auto dst_path = fs::path(config.snapshots.directory) /
fs::path(latest_peer_snapshot->snapshot_name);
if (files::exists(dst_path))
{
LOG_FATAL_FMT(
"Unable to write peer snapshot - already have a file at {}. "
"Exiting.",
dst_path);
return static_cast<int>(CLI::ExitCodes::FileError);
}
files::dump(latest_peer_snapshot->snapshot_data, dst_path);
startup_snapshot = latest_peer_snapshot->snapshot_data;
}
}
else

if (startup_snapshot.empty())
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
if (latest_local_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);

LOG_INFO_FMT(
"Found latest local snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
}
else
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/host/test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
#include "crypto/openssl/hash.h"
#include "ds/files.h"
#include "ds/serialized.h"
#include "host/snapshots.h"
#include "kv/serialised_entry_format.h"
#include "snapshots/snapshot_manager.h"

#define DOCTEST_CONFIG_IMPLEMENT
#include <doctest/doctest.h>
Expand Down Expand Up @@ -1259,6 +1259,8 @@ TEST_CASE("Snapshot file name" * doctest::test_suite("snapshot"))
std::vector<size_t> snapshot_idx_interval_ranges = {
10, 1000, 10000, std::numeric_limits<size_t>::max() - 2};

using namespace snapshots;

for (auto const& snapshot_idx_interval_range : snapshot_idx_interval_ranges)
{
std::uniform_int_distribution<size_t> dist(1, snapshot_idx_interval_range);
Expand Down Expand Up @@ -1304,6 +1306,7 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))
auto snap_ro_dir = AutoDeleteFolder(snapshot_dir_read_only);
fs::create_directory(snapshot_dir_read_only);

using namespace snapshots;
SnapshotManager snapshots(snapshot_dir, wf, snapshot_dir_read_only);

size_t snapshot_interval = 5;
Expand Down
30 changes: 21 additions & 9 deletions src/http/http_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,46 @@ namespace http
return body;
}

void set_body(const std::vector<uint8_t>* b)
void set_body(
const std::vector<uint8_t>* b, bool overwrite_content_length = true)
{
if (b != nullptr)
{
set_body(b->data(), b->size());
set_body(b->data(), b->size(), overwrite_content_length);
}
else
{
set_body(nullptr, 0);
set_body(nullptr, 0, overwrite_content_length);
}
}

void set_body(const uint8_t* b, size_t s)
void set_body(
const uint8_t* b, size_t s, bool overwrite_content_length = true)
{
body = b;
body_size = s;

headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
if (
overwrite_content_length ||
headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end())
{
headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
}
}

void set_body(const std::string& s)
void set_body(const std::string& s, bool overwrite_content_length = true)
{
body = (uint8_t*)s.data();
body_size = s.size();

headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
if (
overwrite_content_length ||
headers.find(ccf::http::headers::CONTENT_LENGTH) == headers.end())
{
headers[ccf::http::headers::CONTENT_LENGTH] =
fmt::format("{}", get_content_length());
}
}
};

Expand Down
Loading
Loading