Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support file:// URLs for snapshot locations #1885

Merged
merged 2 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ mithril-infra/terraform.tfstate*
mithril-infra/*.tfvars
justfile

# Outputs of nix-build:
result
jpraynaud marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ As a minor extension, we have adopted a slightly different versioning convention

- Support for Mithril nodes footprint support in Prometheus monitoring in infrastructure
- Add support for custom HTTP headers in Mithril client WASM library
- Support `file://` URLs for snapshot locations

- **UNSTABLE** Cardano stake distribution certification:

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-client"
version = "0.8.14"
version = "0.8.15"
description = "Mithril client library"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
113 changes: 95 additions & 18 deletions mithril-client/src/snapshot_downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use futures::StreamExt;
use reqwest::Url;
use reqwest::{Response, StatusCode};
use slog::{debug, Logger};
use std::fs;
use std::path::Path;
use tokio::fs::File;
use tokio::io::AsyncReadExt;

#[cfg(test)]
use mockall::automock;
Expand Down Expand Up @@ -78,6 +82,74 @@ impl HttpSnapshotDownloader {
status_code => Err(anyhow!("Unhandled error {status_code}")),
}
}

fn file_scheme_to_local_path(file_url: &str) -> Option<String> {
Url::parse(file_url)
.ok()
.filter(|url| url.scheme() == "file")
.and_then(|url| url.to_file_path().ok())
.map(|path| path.to_string_lossy().into_owned())
}

async fn download_local_file<F, Fut>(
&self,
local_path: &str,
sender: &flume::Sender<Vec<u8>>,
report_progress: F,
) -> MithrilResult<()>
where
F: Fn(u64) -> Fut,
Fut: std::future::Future<Output = ()>,
{
// Stream the `location` directly from the local filesystem
let mut downloaded_bytes: u64 = 0;
let mut file = File::open(local_path).await?;

loop {
// We can either allocate here each time, or clone a shared buffer into sender.
// A larger read buffer is faster, less context switches:
let mut buffer = vec![0; 16 * 1024 * 1024];
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
buffer.truncate(bytes_read);
sender.send_async(buffer).await.with_context(|| {
format!(
"Local file read: could not write {} bytes to stream.",
bytes_read
)
})?;
downloaded_bytes += bytes_read as u64;
report_progress(downloaded_bytes).await
}
Ok(())
}

async fn download_remote_file<F, Fut>(
&self,
location: &str,
sender: &flume::Sender<Vec<u8>>,
report_progress: F,
) -> MithrilResult<()>
where
F: Fn(u64) -> Fut,
Fut: std::future::Future<Output = ()>,
{
let mut downloaded_bytes: u64 = 0;
let mut remote_stream = self.get(location).await?.bytes_stream();
while let Some(item) = remote_stream.next().await {
let chunk = item.with_context(|| "Download: Could not read from byte stream")?;

sender.send_async(chunk.to_vec()).await.with_context(|| {
format!("Download: could not write {} bytes to stream.", chunk.len())
})?;

downloaded_bytes += chunk.len() as u64;
report_progress(downloaded_bytes).await
}
Ok(())
}
}

#[cfg_attr(test, automock)]
Expand All @@ -97,8 +169,6 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
.context("Download-Unpack: prerequisite error"),
)?;
}
let mut downloaded_bytes: u64 = 0;
let mut remote_stream = self.get(location).await?.bytes_stream();
let (sender, receiver) = flume::bounded(5);

let dest_dir = target_dir.to_path_buf();
Expand All @@ -107,21 +177,22 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
unpacker.unpack_snapshot(receiver, compression_algorithm, &dest_dir)
});

while let Some(item) = remote_stream.next().await {
let chunk = item.with_context(|| "Download: Could not read from byte stream")?;

sender.send_async(chunk.to_vec()).await.with_context(|| {
format!("Download: could not write {} bytes to stream.", chunk.len())
})?;

downloaded_bytes += chunk.len() as u64;
let report_progress = |downloaded_bytes: u64| async move {
self.feedback_sender
.send_event(MithrilEvent::SnapshotDownloadProgress {
download_id: download_id.to_owned(),
downloaded_bytes,
size: snapshot_size,
})
.await
};

if let Some(local_path) = Self::file_scheme_to_local_path(location) {
jpraynaud marked this conversation as resolved.
Show resolved Hide resolved
self.download_local_file(&local_path, &sender, report_progress)
.await?;
} else {
self.download_remote_file(location, &sender, report_progress)
.await?;
}

drop(sender); // Signal EOF
Expand All @@ -143,15 +214,21 @@ impl SnapshotDownloader for HttpSnapshotDownloader {
async fn probe(&self, location: &str) -> MithrilResult<()> {
debug!(self.logger, "HEAD Snapshot location='{location}'.");

let request_builder = self.http_client.head(location);
let response = request_builder.send().await.with_context(|| {
format!("Cannot perform a HEAD for snapshot at location='{location}'")
})?;
if let Some(local_path) = Self::file_scheme_to_local_path(location) {
fs::metadata(local_path)
.with_context(|| format!("Local snapshot location='{location}' not found"))
.map(drop)
} else {
let request_builder = self.http_client.head(location);
let response = request_builder.send().await.with_context(|| {
format!("Cannot perform a HEAD for snapshot at location='{location}'")
})?;

match response.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")),
status_code => Err(anyhow!("Unhandled error {status_code}")),
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")),
status_code => Err(anyhow!("Unhandled error {status_code}")),
}
}
}
}
Loading