Skip to content

Commit

Permalink
feat: progress bars for asset upload (#4091)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericswanson-dfinity authored Jan 30, 2025
1 parent 1a7a929 commit bf4d3f1
Show file tree
Hide file tree
Showing 20 changed files with 402 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### fix: `dfx canister install` and `dfx deploy` with `--no-asset-upgrade` no longer hang indefinitely when wasm is not up to date

### feat: streamlined output during asset synchronization

# 0.25.0

### fix: correctly detects hyphenated Rust bin crates
Expand Down
8 changes: 4 additions & 4 deletions e2e/tests-dfx/assetscanister.bash
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,14 @@ check_permission_failure() {
dd if=/dev/urandom of=src/e2e_project_frontend/assets/asset2.bin bs=400000 count=1

dfx_start
assert_command dfx deploy
assert_command dfx deploy -v

assert_match '/asset1.bin 1/1'
assert_match '/asset2.bin 1/1'

dd if=/dev/urandom of=src/e2e_project_frontend/assets/asset2.bin bs=400000 count=1

assert_command dfx deploy
assert_command dfx deploy -v
assert_match '/asset1.bin.*is already installed'
assert_match '/asset2.bin 1/1'
}
Expand Down Expand Up @@ -1649,7 +1649,7 @@ EOF
assert_match "200 OK" "$stderr"

# redirect survives upgrade
assert_command dfx deploy --upgrade-unchanged
assert_command dfx deploy --upgrade-unchanged -v
assert_match "is already installed"

assert_command curl --fail -vv http://localhost:"$PORT"/test_alias_file.html?canisterId="$ID"
Expand Down Expand Up @@ -1922,7 +1922,7 @@ WARN: {
},
]' > src/e2e_project_frontend/assets/somedir/.ic-assets.json5

assert_command dfx deploy
assert_command dfx deploy -v
assert_match '/somedir/upload-me.txt 1/1 \(8 bytes\) sha [0-9a-z]* \(with cache and 1 header\)'
}

Expand Down
63 changes: 50 additions & 13 deletions src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ use crate::error::CreateEncodingError;
use crate::error::CreateEncodingError::EncodeContentFailed;
use crate::error::CreateProjectAssetError;
use crate::error::SetEncodingError;
use crate::AssetSyncProgressRenderer;
use candid::Nat;
use futures::future::try_join_all;
use futures::TryFutureExt;
use ic_utils::Canister;
use mime::Mime;
use slog::{debug, info, Logger};
use slog::{debug, Logger};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::PathBuf;
Expand Down Expand Up @@ -89,14 +90,22 @@ impl<'agent> ChunkUploader<'agent> {
&self,
contents: &[u8],
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<usize, CreateChunkError> {
let uploader_chunk_id = self.chunks.fetch_add(1, Ordering::SeqCst);
self.bytes.fetch_add(contents.len(), Ordering::SeqCst);
if contents.len() == MAX_CHUNK_SIZE || self.api_version < 2 {
let canister_chunk_id =
create_chunk(&self.canister, &self.batch_id, contents, semaphores).await?;
let canister_chunk_id = create_chunk(
&self.canister,
&self.batch_id,
contents,
semaphores,
progress,
)
.await?;
let mut map = self.id_mapping.lock().await;
map.insert(uploader_chunk_id, canister_chunk_id);

Ok(uploader_chunk_id)
} else {
self.add_to_upload_queue(uploader_chunk_id, contents).await;
Expand All @@ -106,7 +115,8 @@ impl<'agent> ChunkUploader<'agent> {
// - Tested with: `for i in $(seq 1 50); do dd if=/dev/urandom of="src/hello_frontend/assets/file_$i.bin" bs=$(shuf -i 1-2000000 -n 1) count=1; done && dfx deploy hello_frontend`
// - Result: Roughly 15% of batches under 90% full.
// With other byte ranges (e.g. `shuf -i 1-3000000 -n 1`) stats improve significantly
self.upload_chunks(4 * MAX_CHUNK_SIZE, semaphores).await?;
self.upload_chunks(4 * MAX_CHUNK_SIZE, semaphores, progress)
.await?;
Ok(uploader_chunk_id)
}
}
Expand All @@ -115,6 +125,7 @@ impl<'agent> ChunkUploader<'agent> {
&self,
semaphores: &Semaphores,
mode: Mode,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), CreateChunkError> {
let max_retained_bytes = if mode == Mode::ByProposal {
// Never add data to the commit_batch args, because they have to fit in a single call.
Expand All @@ -124,7 +135,8 @@ impl<'agent> ChunkUploader<'agent> {
MAX_CHUNK_SIZE / 2
};

self.upload_chunks(max_retained_bytes, semaphores).await
self.upload_chunks(max_retained_bytes, semaphores, progress)
.await
}

pub(crate) fn bytes(&self) -> usize {
Expand Down Expand Up @@ -174,6 +186,7 @@ impl<'agent> ChunkUploader<'agent> {
&self,
max_retained_bytes: usize,
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), CreateChunkError> {
let mut queue = self.upload_queue.lock().await;

Expand Down Expand Up @@ -202,7 +215,7 @@ impl<'agent> ChunkUploader<'agent> {
try_join_all(batches.into_iter().map(|chunks| async move {
let (uploader_chunk_ids, chunks): (Vec<_>, Vec<_>) = chunks.into_iter().unzip();
let canister_chunk_ids =
create_chunks(&self.canister, &self.batch_id, chunks, semaphores).await?;
create_chunks(&self.canister, &self.batch_id, chunks, semaphores, progress).await?;
let mut map = self.id_mapping.lock().await;
for (uploader_id, canister_id) in uploader_chunk_ids
.into_iter()
Expand All @@ -227,6 +240,7 @@ async fn make_project_asset_encoding(
content_encoding: &str,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<ProjectAssetEncoding, CreateChunkError> {
let sha256 = content.sha256();

Expand All @@ -249,7 +263,7 @@ async fn make_project_asset_encoding(
};

let uploader_chunk_ids = if already_in_place {
info!(
debug!(
logger,
" {}{} ({} bytes) sha {} is already installed",
&asset_descriptor.key,
Expand All @@ -267,10 +281,11 @@ async fn make_project_asset_encoding(
content_encoding,
semaphores,
logger,
progress,
)
.await?
} else {
info!(
debug!(
logger,
" {}{} ({} bytes) sha {} will be uploaded",
&asset_descriptor.key,
Expand Down Expand Up @@ -298,6 +313,7 @@ async fn make_encoding(
force_encoding: bool,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Option<(String, ProjectAssetEncoding)>, CreateEncodingError> {
match encoder {
ContentEncoder::Identity => {
Expand All @@ -309,6 +325,7 @@ async fn make_encoding(
CONTENT_ENCODING_IDENTITY,
semaphores,
logger,
progress,
)
.await
.map_err(CreateEncodingError::CreateChunkFailed)?;
Expand All @@ -331,6 +348,7 @@ async fn make_encoding(
&content_encoding,
semaphores,
logger,
progress,
)
.await
.map_err(CreateEncodingError::CreateChunkFailed)?;
Expand All @@ -349,12 +367,14 @@ async fn make_encodings(
content: &Content,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, ProjectAssetEncoding>, CreateEncodingError> {
let encoders = asset_descriptor
.config
.encodings
.clone()
.unwrap_or_else(|| default_encoders(&content.media_type));

// The identity encoding is always uploaded if it's in the list of chosen encodings.
// Other encoding are only uploaded if they save bytes compared to identity.
// The encoding is forced through the filter if there is no identity encoding to compare against.
Expand All @@ -372,6 +392,7 @@ async fn make_encodings(
force_encoding,
semaphores,
logger,
progress,
)
})
.collect();
Expand All @@ -392,6 +413,7 @@ async fn make_project_asset(
canister_assets: &HashMap<String, AssetDetails>,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<ProjectAsset, CreateProjectAssetError> {
let file_size = dfx_core::fs::metadata(&asset_descriptor.source)?.len();
let permits = std::cmp::max(
Expand All @@ -412,9 +434,13 @@ async fn make_project_asset(
&content,
semaphores,
logger,
progress,
)
.await?;

if let Some(progress) = progress {
progress.increment_complete_assets();
}
Ok(ProjectAsset {
asset_descriptor,
media_type: content.media_type,
Expand All @@ -428,9 +454,13 @@ pub(crate) async fn make_project_assets(
canister_assets: &HashMap<String, AssetDetails>,
mode: Mode,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, ProjectAsset>, CreateProjectAssetError> {
let semaphores = Semaphores::new();

if let Some(progress) = progress {
progress.set_total_assets(asset_descriptors.len());
}
let project_asset_futures: Vec<_> = asset_descriptors
.iter()
.map(|loc| {
Expand All @@ -440,13 +470,14 @@ pub(crate) async fn make_project_assets(
canister_assets,
&semaphores,
logger,
progress,
)
})
.collect();
let project_assets = try_join_all(project_asset_futures).await?;
if let Some(uploader) = chunk_upload_target {
uploader
.finalize_upload(&semaphores, mode)
.finalize_upload(&semaphores, mode, progress)
.await
.map_err(|err| {
CreateProjectAssetError::CreateEncodingError(
Expand All @@ -470,11 +501,14 @@ async fn upload_content_chunks(
content_encoding: &str,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Vec<usize>, CreateChunkError> {
if content.data.is_empty() {
let empty = vec![];
let chunk_id = chunk_uploader.create_chunk(&empty, semaphores).await?;
info!(
let chunk_id = chunk_uploader
.create_chunk(&empty, semaphores, progress)
.await?;
debug!(
logger,
" {}{} 1/1 (0 bytes) sha {}",
&asset_descriptor.key,
Expand All @@ -484,16 +518,19 @@ async fn upload_content_chunks(
return Ok(vec![chunk_id]);
}

if let Some(progress) = progress {
progress.add_total_bytes(content.data.len());
}
let count = (content.data.len() + MAX_CHUNK_SIZE - 1) / MAX_CHUNK_SIZE;
let chunks_futures: Vec<_> = content
.data
.chunks(MAX_CHUNK_SIZE)
.enumerate()
.map(|(i, data_chunk)| {
chunk_uploader
.create_chunk(data_chunk, semaphores)
.create_chunk(data_chunk, semaphores, progress)
.map_ok(move |chunk_id| {
info!(
debug!(
logger,
" {}{} {}/{} ({} bytes) sha {} {}",
&asset_descriptor.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
methods::method_names::GET_ASSET_PROPERTIES,
types::asset::{AssetDetails, AssetProperties, GetAssetPropertiesArgument},
},
AssetSyncProgressRenderer,
};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
Expand All @@ -20,9 +21,13 @@ const MAX_CONCURRENT_REQUESTS: usize = 50;
pub(crate) async fn get_assets_properties(
canister: &Canister<'_>,
canister_assets: &HashMap<String, AssetDetails>,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, AssetProperties>, GetAssetPropertiesError> {
let semaphore = SharedSemaphore::new(true, MAX_CONCURRENT_REQUESTS);

if let Some(progress) = progress {
progress.set_asset_properties_to_retrieve(canister_assets.len());
}
let asset_ids = canister_assets.keys().cloned().collect::<Vec<_>>();
let futs = asset_ids
.iter()
Expand All @@ -40,7 +45,12 @@ pub(crate) async fn get_assets_properties(
let response = get_asset_properties(canister, asset_id).await;

match response {
Ok(asset_properties) => break Ok(asset_properties),
Ok(asset_properties) => {
if let Some(progress) = progress {
progress.inc_asset_properties_retrieved();
}
break Ok(asset_properties);
}
Err(agent_err) if !retryable(&agent_err) => {
break Err(agent_err);
}
Expand Down
10 changes: 10 additions & 0 deletions src/canisters/frontend/ic-asset/src/canister_api/methods/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::canister_api::types::batch_upload::common::{
CreateChunkRequest, CreateChunkResponse, CreateChunksRequest, CreateChunksResponse,
};
use crate::error::CreateChunkError;
use crate::AssetSyncProgressRenderer;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use candid::{Decode, Nat};
Expand All @@ -19,6 +20,7 @@ pub(crate) async fn create_chunk(
batch_id: &Nat,
content: &[u8],
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Nat, CreateChunkError> {
let _chunk_releaser = semaphores.create_chunk.acquire(1).await;
let batch_id = batch_id.clone();
Expand Down Expand Up @@ -58,6 +60,9 @@ pub(crate) async fn create_chunk(

match wait_result {
Ok((chunk_id,)) => {
if let Some(progress) = progress {
progress.add_uploaded_bytes(content.len());
}
return Ok(chunk_id);
}
Err(agent_err) if !retryable(&agent_err) => {
Expand All @@ -76,7 +81,9 @@ pub(crate) async fn create_chunks(
batch_id: &Nat,
content: Vec<Vec<u8>>,
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Vec<Nat>, CreateChunkError> {
let content_byte_len = content.iter().fold(0, |acc, x| acc + x.len());
let _chunk_releaser = semaphores.create_chunk.acquire(1).await;
let batch_id = batch_id.clone();
let args = CreateChunksRequest { batch_id, content };
Expand Down Expand Up @@ -115,6 +122,9 @@ pub(crate) async fn create_chunks(

match wait_result {
Ok((chunk_ids,)) => {
if let Some(progress) = progress {
progress.add_uploaded_bytes(content_byte_len);
}
return Ok(chunk_ids);
}
Err(agent_err) if !retryable(&agent_err) => {
Expand Down
Loading

0 comments on commit bf4d3f1

Please sign in to comment.