Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: progress bars for asset upload #4091

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### fix: `dfx canister install` and `dfx deploy` with `--no-asset-upgrade` no longer hang indefinitely when wasm is not up to date

### feat: streamlined output during asset synchronization

# 0.25.0

### fix: correctly detects hyphenated Rust bin crates
Expand Down
8 changes: 4 additions & 4 deletions e2e/tests-dfx/assetscanister.bash
Original file line number Diff line number Diff line change
Expand Up @@ -905,14 +905,14 @@ check_permission_failure() {
dd if=/dev/urandom of=src/e2e_project_frontend/assets/asset2.bin bs=400000 count=1

dfx_start
assert_command dfx deploy
assert_command dfx deploy -v

assert_match '/asset1.bin 1/1'
assert_match '/asset2.bin 1/1'

dd if=/dev/urandom of=src/e2e_project_frontend/assets/asset2.bin bs=400000 count=1

assert_command dfx deploy
assert_command dfx deploy -v
assert_match '/asset1.bin.*is already installed'
assert_match '/asset2.bin 1/1'
}
Expand Down Expand Up @@ -1649,7 +1649,7 @@ EOF
assert_match "200 OK" "$stderr"

# redirect survives upgrade
assert_command dfx deploy --upgrade-unchanged
assert_command dfx deploy --upgrade-unchanged -v
assert_match "is already installed"

assert_command curl --fail -vv http://localhost:"$PORT"/test_alias_file.html?canisterId="$ID"
Expand Down Expand Up @@ -1922,7 +1922,7 @@ WARN: {
},
]' > src/e2e_project_frontend/assets/somedir/.ic-assets.json5

assert_command dfx deploy
assert_command dfx deploy -v
assert_match '/somedir/upload-me.txt 1/1 \(8 bytes\) sha [0-9a-z]* \(with cache and 1 header\)'
}

Expand Down
63 changes: 50 additions & 13 deletions src/canisters/frontend/ic-asset/src/batch_upload/plumbing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ use crate::error::CreateEncodingError;
use crate::error::CreateEncodingError::EncodeContentFailed;
use crate::error::CreateProjectAssetError;
use crate::error::SetEncodingError;
use crate::AssetSyncProgressRenderer;
use candid::Nat;
use futures::future::try_join_all;
use futures::TryFutureExt;
use ic_utils::Canister;
use mime::Mime;
use slog::{debug, info, Logger};
use slog::{debug, Logger};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::PathBuf;
Expand Down Expand Up @@ -89,14 +90,22 @@ impl<'agent> ChunkUploader<'agent> {
&self,
contents: &[u8],
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<usize, CreateChunkError> {
let uploader_chunk_id = self.chunks.fetch_add(1, Ordering::SeqCst);
self.bytes.fetch_add(contents.len(), Ordering::SeqCst);
if contents.len() == MAX_CHUNK_SIZE || self.api_version < 2 {
let canister_chunk_id =
create_chunk(&self.canister, &self.batch_id, contents, semaphores).await?;
let canister_chunk_id = create_chunk(
&self.canister,
&self.batch_id,
contents,
semaphores,
progress,
)
.await?;
let mut map = self.id_mapping.lock().await;
map.insert(uploader_chunk_id, canister_chunk_id);

Ok(uploader_chunk_id)
} else {
self.add_to_upload_queue(uploader_chunk_id, contents).await;
Expand All @@ -106,7 +115,8 @@ impl<'agent> ChunkUploader<'agent> {
// - Tested with: `for i in $(seq 1 50); do dd if=/dev/urandom of="src/hello_frontend/assets/file_$i.bin" bs=$(shuf -i 1-2000000 -n 1) count=1; done && dfx deploy hello_frontend`
// - Result: Roughly 15% of batches under 90% full.
// With other byte ranges (e.g. `shuf -i 1-3000000 -n 1`) stats improve significantly
self.upload_chunks(4 * MAX_CHUNK_SIZE, semaphores).await?;
self.upload_chunks(4 * MAX_CHUNK_SIZE, semaphores, progress)
.await?;
Ok(uploader_chunk_id)
}
}
Expand All @@ -115,6 +125,7 @@ impl<'agent> ChunkUploader<'agent> {
&self,
semaphores: &Semaphores,
mode: Mode,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), CreateChunkError> {
let max_retained_bytes = if mode == Mode::ByProposal {
// Never add data to the commit_batch args, because they have to fit in a single call.
Expand All @@ -124,7 +135,8 @@ impl<'agent> ChunkUploader<'agent> {
MAX_CHUNK_SIZE / 2
};

self.upload_chunks(max_retained_bytes, semaphores).await
self.upload_chunks(max_retained_bytes, semaphores, progress)
.await
}

pub(crate) fn bytes(&self) -> usize {
Expand Down Expand Up @@ -174,6 +186,7 @@ impl<'agent> ChunkUploader<'agent> {
&self,
max_retained_bytes: usize,
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), CreateChunkError> {
let mut queue = self.upload_queue.lock().await;

Expand Down Expand Up @@ -202,7 +215,7 @@ impl<'agent> ChunkUploader<'agent> {
try_join_all(batches.into_iter().map(|chunks| async move {
let (uploader_chunk_ids, chunks): (Vec<_>, Vec<_>) = chunks.into_iter().unzip();
let canister_chunk_ids =
create_chunks(&self.canister, &self.batch_id, chunks, semaphores).await?;
create_chunks(&self.canister, &self.batch_id, chunks, semaphores, progress).await?;
let mut map = self.id_mapping.lock().await;
for (uploader_id, canister_id) in uploader_chunk_ids
.into_iter()
Expand All @@ -227,6 +240,7 @@ async fn make_project_asset_encoding(
content_encoding: &str,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<ProjectAssetEncoding, CreateChunkError> {
let sha256 = content.sha256();

Expand All @@ -249,7 +263,7 @@ async fn make_project_asset_encoding(
};

let uploader_chunk_ids = if already_in_place {
info!(
debug!(
logger,
" {}{} ({} bytes) sha {} is already installed",
&asset_descriptor.key,
Expand All @@ -267,10 +281,11 @@ async fn make_project_asset_encoding(
content_encoding,
semaphores,
logger,
progress,
)
.await?
} else {
info!(
debug!(
logger,
" {}{} ({} bytes) sha {} will be uploaded",
&asset_descriptor.key,
Expand Down Expand Up @@ -298,6 +313,7 @@ async fn make_encoding(
force_encoding: bool,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Option<(String, ProjectAssetEncoding)>, CreateEncodingError> {
match encoder {
ContentEncoder::Identity => {
Expand All @@ -309,6 +325,7 @@ async fn make_encoding(
CONTENT_ENCODING_IDENTITY,
semaphores,
logger,
progress,
)
.await
.map_err(CreateEncodingError::CreateChunkFailed)?;
Expand All @@ -331,6 +348,7 @@ async fn make_encoding(
&content_encoding,
semaphores,
logger,
progress,
)
.await
.map_err(CreateEncodingError::CreateChunkFailed)?;
Expand All @@ -349,12 +367,14 @@ async fn make_encodings(
content: &Content,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, ProjectAssetEncoding>, CreateEncodingError> {
let encoders = asset_descriptor
.config
.encodings
.clone()
.unwrap_or_else(|| default_encoders(&content.media_type));

// The identity encoding is always uploaded if it's in the list of chosen encodings.
// Other encoding are only uploaded if they save bytes compared to identity.
// The encoding is forced through the filter if there is no identity encoding to compare against.
Expand All @@ -372,6 +392,7 @@ async fn make_encodings(
force_encoding,
semaphores,
logger,
progress,
)
})
.collect();
Expand All @@ -392,6 +413,7 @@ async fn make_project_asset(
canister_assets: &HashMap<String, AssetDetails>,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<ProjectAsset, CreateProjectAssetError> {
let file_size = dfx_core::fs::metadata(&asset_descriptor.source)?.len();
let permits = std::cmp::max(
Expand All @@ -412,9 +434,13 @@ async fn make_project_asset(
&content,
semaphores,
logger,
progress,
)
.await?;

if let Some(progress) = progress {
progress.increment_complete_assets();
}
Ok(ProjectAsset {
asset_descriptor,
media_type: content.media_type,
Expand All @@ -428,9 +454,13 @@ pub(crate) async fn make_project_assets(
canister_assets: &HashMap<String, AssetDetails>,
mode: Mode,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, ProjectAsset>, CreateProjectAssetError> {
let semaphores = Semaphores::new();

if let Some(progress) = progress {
progress.set_total_assets(asset_descriptors.len());
}
let project_asset_futures: Vec<_> = asset_descriptors
.iter()
.map(|loc| {
Expand All @@ -440,13 +470,14 @@ pub(crate) async fn make_project_assets(
canister_assets,
&semaphores,
logger,
progress,
)
})
.collect();
let project_assets = try_join_all(project_asset_futures).await?;
if let Some(uploader) = chunk_upload_target {
uploader
.finalize_upload(&semaphores, mode)
.finalize_upload(&semaphores, mode, progress)
.await
.map_err(|err| {
CreateProjectAssetError::CreateEncodingError(
Expand All @@ -470,11 +501,14 @@ async fn upload_content_chunks(
content_encoding: &str,
semaphores: &Semaphores,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Vec<usize>, CreateChunkError> {
if content.data.is_empty() {
let empty = vec![];
let chunk_id = chunk_uploader.create_chunk(&empty, semaphores).await?;
info!(
let chunk_id = chunk_uploader
.create_chunk(&empty, semaphores, progress)
.await?;
debug!(
logger,
" {}{} 1/1 (0 bytes) sha {}",
&asset_descriptor.key,
Expand All @@ -484,16 +518,19 @@ async fn upload_content_chunks(
return Ok(vec![chunk_id]);
}

if let Some(progress) = progress {
progress.add_total_bytes(content.data.len());
}
let count = (content.data.len() + MAX_CHUNK_SIZE - 1) / MAX_CHUNK_SIZE;
let chunks_futures: Vec<_> = content
.data
.chunks(MAX_CHUNK_SIZE)
.enumerate()
.map(|(i, data_chunk)| {
chunk_uploader
.create_chunk(data_chunk, semaphores)
.create_chunk(data_chunk, semaphores, progress)
.map_ok(move |chunk_id| {
info!(
debug!(
logger,
" {}{} {}/{} ({} bytes) sha {} {}",
&asset_descriptor.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
methods::method_names::GET_ASSET_PROPERTIES,
types::asset::{AssetDetails, AssetProperties, GetAssetPropertiesArgument},
},
AssetSyncProgressRenderer,
};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
Expand All @@ -20,9 +21,13 @@ const MAX_CONCURRENT_REQUESTS: usize = 50;
pub(crate) async fn get_assets_properties(
canister: &Canister<'_>,
canister_assets: &HashMap<String, AssetDetails>,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<HashMap<String, AssetProperties>, GetAssetPropertiesError> {
let semaphore = SharedSemaphore::new(true, MAX_CONCURRENT_REQUESTS);

if let Some(progress) = progress {
progress.set_asset_properties_to_retrieve(canister_assets.len());
}
let asset_ids = canister_assets.keys().cloned().collect::<Vec<_>>();
let futs = asset_ids
.iter()
Expand All @@ -40,7 +45,12 @@ pub(crate) async fn get_assets_properties(
let response = get_asset_properties(canister, asset_id).await;

match response {
Ok(asset_properties) => break Ok(asset_properties),
Ok(asset_properties) => {
if let Some(progress) = progress {
progress.inc_asset_properties_retrieved();
}
break Ok(asset_properties);
}
Err(agent_err) if !retryable(&agent_err) => {
break Err(agent_err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::canister_api::types::batch_upload::common::{
CreateChunkRequest, CreateChunkResponse, CreateChunksRequest, CreateChunksResponse,
};
use crate::error::CreateChunkError;
use crate::AssetSyncProgressRenderer;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use candid::{Decode, Nat};
Expand All @@ -19,6 +20,7 @@ pub(crate) async fn create_chunk(
batch_id: &Nat,
content: &[u8],
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Nat, CreateChunkError> {
let _chunk_releaser = semaphores.create_chunk.acquire(1).await;
let batch_id = batch_id.clone();
Expand Down Expand Up @@ -58,6 +60,9 @@ pub(crate) async fn create_chunk(

match wait_result {
Ok((chunk_id,)) => {
if let Some(progress) = progress {
progress.add_uploaded_bytes(content.len());
}
return Ok(chunk_id);
}
Err(agent_err) if !retryable(&agent_err) => {
Expand All @@ -76,7 +81,9 @@ pub(crate) async fn create_chunks(
batch_id: &Nat,
content: Vec<Vec<u8>>,
semaphores: &Semaphores,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<Vec<Nat>, CreateChunkError> {
let content_byte_len = content.iter().fold(0, |acc, x| acc + x.len());
let _chunk_releaser = semaphores.create_chunk.acquire(1).await;
let batch_id = batch_id.clone();
let args = CreateChunksRequest { batch_id, content };
Expand Down Expand Up @@ -115,6 +122,9 @@ pub(crate) async fn create_chunks(

match wait_result {
Ok((chunk_ids,)) => {
if let Some(progress) = progress {
progress.add_uploaded_bytes(content_byte_len);
}
return Ok(chunk_ids);
}
Err(agent_err) if !retryable(&agent_err) => {
Expand Down
Loading
Loading