From 4aec97f8ea8a3ac5997f812f8f124065959cbce8 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Fri, 9 Aug 2024 15:14:58 +0200 Subject: [PATCH 1/2] feat: support `file://` URLs for snapshot locations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LW-11112 This is very useful for live demos, where you don’t want to repeatedly download from Google Cloud Storage because of the wait time and cost. --- .gitignore | 2 + mithril-client/src/snapshot_downloader.rs | 113 ++++++++++++++++++---- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 55419a848f1..8982015ff46 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ mithril-infra/terraform.tfstate* mithril-infra/*.tfvars justfile +# Outputs of nix-build: +result diff --git a/mithril-client/src/snapshot_downloader.rs b/mithril-client/src/snapshot_downloader.rs index 39b60ced3a7..14ab2ff12eb 100644 --- a/mithril-client/src/snapshot_downloader.rs +++ b/mithril-client/src/snapshot_downloader.rs @@ -10,9 +10,13 @@ use anyhow::{anyhow, Context}; use async_trait::async_trait; use futures::StreamExt; +use reqwest::Url; use reqwest::{Response, StatusCode}; use slog::{debug, Logger}; +use std::fs; use std::path::Path; +use tokio::fs::File; +use tokio::io::AsyncReadExt; #[cfg(test)] use mockall::automock; @@ -78,6 +82,74 @@ impl HttpSnapshotDownloader { status_code => Err(anyhow!("Unhandled error {status_code}")), } } + + fn file_scheme_to_local_path(file_url: &str) -> Option { + Url::parse(file_url) + .ok() + .filter(|url| url.scheme() == "file") + .and_then(|url| url.to_file_path().ok()) + .map(|path| path.to_string_lossy().into_owned()) + } + + async fn download_local_file( + &self, + local_path: &str, + sender: &flume::Sender>, + report_progress: F, + ) -> MithrilResult<()> + where + F: Fn(u64) -> Fut, + Fut: std::future::Future, + { + // Stream the `location` directly from the local filesystem + let mut downloaded_bytes: u64 = 0; + let mut file = File::open(local_path).await?; + + loop { + // We can either allocate here each time, or clone a shared buffer into sender. + // A larger read buffer is faster, less context switches: + let mut buffer = vec![0; 16 * 1024 * 1024]; + let bytes_read = file.read(&mut buffer).await?; + if bytes_read == 0 { + break; + } + buffer.truncate(bytes_read); + sender.send_async(buffer).await.with_context(|| { + format!( + "Local file read: could not write {} bytes to stream.", + bytes_read + ) + })?; + downloaded_bytes += bytes_read as u64; + report_progress(downloaded_bytes).await + } + Ok(()) + } + + async fn download_remote_file( + &self, + location: &str, + sender: &flume::Sender>, + report_progress: F, + ) -> MithrilResult<()> + where + F: Fn(u64) -> Fut, + Fut: std::future::Future, + { + let mut downloaded_bytes: u64 = 0; + let mut remote_stream = self.get(location).await?.bytes_stream(); + while let Some(item) = remote_stream.next().await { + let chunk = item.with_context(|| "Download: Could not read from byte stream")?; + + sender.send_async(chunk.to_vec()).await.with_context(|| { + format!("Download: could not write {} bytes to stream.", chunk.len()) + })?; + + downloaded_bytes += chunk.len() as u64; + report_progress(downloaded_bytes).await + } + Ok(()) + } } #[cfg_attr(test, automock)] @@ -97,8 +169,6 @@ impl SnapshotDownloader for HttpSnapshotDownloader { .context("Download-Unpack: prerequisite error"), )?; } - let mut downloaded_bytes: u64 = 0; - let mut remote_stream = self.get(location).await?.bytes_stream(); let (sender, receiver) = flume::bounded(5); let dest_dir = target_dir.to_path_buf(); @@ -107,14 +177,7 @@ impl SnapshotDownloader for HttpSnapshotDownloader { unpacker.unpack_snapshot(receiver, compression_algorithm, &dest_dir) }); - while let Some(item) = remote_stream.next().await { - let chunk = item.with_context(|| "Download: Could not read from byte stream")?; - - sender.send_async(chunk.to_vec()).await.with_context(|| { - format!("Download: could not write {} bytes to stream.", chunk.len()) - })?; - - downloaded_bytes += chunk.len() as u64; + let report_progress = |downloaded_bytes: u64| async move { self.feedback_sender .send_event(MithrilEvent::SnapshotDownloadProgress { download_id: download_id.to_owned(), @@ -122,6 +185,14 @@ impl SnapshotDownloader for HttpSnapshotDownloader { size: snapshot_size, }) .await + }; + + if let Some(local_path) = Self::file_scheme_to_local_path(location) { + self.download_local_file(&local_path, &sender, report_progress) + .await?; + } else { + self.download_remote_file(location, &sender, report_progress) + .await?; } drop(sender); // Signal EOF @@ -143,15 +214,21 @@ impl SnapshotDownloader for HttpSnapshotDownloader { async fn probe(&self, location: &str) -> MithrilResult<()> { debug!(self.logger, "HEAD Snapshot location='{location}'."); - let request_builder = self.http_client.head(location); - let response = request_builder.send().await.with_context(|| { - format!("Cannot perform a HEAD for snapshot at location='{location}'") - })?; + if let Some(local_path) = Self::file_scheme_to_local_path(location) { + fs::metadata(local_path) + .with_context(|| format!("Local snapshot location='{location}' not found")) + .map(drop) + } else { + let request_builder = self.http_client.head(location); + let response = request_builder.send().await.with_context(|| { + format!("Cannot perform a HEAD for snapshot at location='{location}'") + })?; - match response.status() { - StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")), - status_code => Err(anyhow!("Unhandled error {status_code}")), + match response.status() { + StatusCode::OK => Ok(()), + StatusCode::NOT_FOUND => Err(anyhow!("Snapshot location='{location} not found")), + status_code => Err(anyhow!("Unhandled error {status_code}")), + } } } } From 6ffd08cb49e968a042bc926c411fd35a39e487c2 Mon Sep 17 00:00:00 2001 From: Michal Rus Date: Fri, 23 Aug 2024 10:21:25 +0200 Subject: [PATCH 2/2] chore: update the crate version and the changelog LW-11112 --- CHANGELOG.md | 1 + Cargo.lock | 2 +- mithril-client/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 019d475fdec..4bdeb89f167 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ As a minor extension, we have adopted a slightly different versioning convention - Support for Mithril nodes footprint support in Prometheus monitoring in infrastructure - Add support for custom HTTP headers in Mithril client WASM library +- Support `file://` URLs for snapshot locations - **UNSTABLE** Cardano stake distribution certification: diff --git a/Cargo.lock b/Cargo.lock index f8ab16485c2..9ffbf40e59b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3629,7 +3629,7 @@ dependencies = [ [[package]] name = "mithril-client" -version = "0.8.14" +version = "0.8.15" dependencies = [ "anyhow", "async-recursion", diff --git a/mithril-client/Cargo.toml b/mithril-client/Cargo.toml index 314a77a506c..b6eb5d83284 100644 --- a/mithril-client/Cargo.toml +++ b/mithril-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-client" -version = "0.8.14" +version = "0.8.15" description = "Mithril client library" authors = { workspace = true } edition = { workspace = true }