Skip to content

Commit

Permalink
fix(autonomi): use FuturesUnordered for WASM
Browse files Browse the repository at this point in the history
Instead of JoinSet from `tokio` which uses the Tokio runtime.
  • Loading branch information
b-zee committed Oct 28, 2024
1 parent 31721be commit 26bce27
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 20 deletions.
18 changes: 6 additions & 12 deletions autonomi/src/client/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// permissions and limitations relating to use of the SAFE Network Software.

use bytes::Bytes;
use futures::StreamExt as _;
use libp2p::kad::Quorum;
use tokio::task::{JoinError, JoinSet};

use std::collections::HashSet;
use xor_name::XorName;
Expand Down Expand Up @@ -47,8 +47,6 @@ pub enum PutError {
VaultBadOwner,
#[error("Payment unexpectedly invalid for {0:?}")]
PaymentUnexpectedlyInvalid(NetworkAddress),
#[error("Could not simultaneously upload chunks: {0:?}")]
JoinError(tokio::task::JoinError),
}

/// Errors that can occur during the pay operation.
Expand Down Expand Up @@ -80,8 +78,6 @@ pub enum GetError {
/// Errors that can occur during the cost calculation.
#[derive(Debug, thiserror::Error)]
pub enum CostError {
#[error("Could not simultaneously fetch store costs: {0:?}")]
JoinError(JoinError),
#[error("Failed to self-encrypt data.")]
SelfEncryption(#[from] crate::self_encryption::Error),
#[error("Could not get store quote for: {0:?} after several retries")]
Expand Down Expand Up @@ -135,13 +131,14 @@ impl Client {

// Upload all the chunks in parallel including the data map chunk
debug!("Uploading {} chunks", chunks.len());
let mut tasks = JoinSet::new();
let mut tasks = futures::stream::FuturesUnordered::new();

for chunk in chunks.into_iter().chain(std::iter::once(data_map_chunk)) {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.spawn(async move {
tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
Expand All @@ -151,11 +148,8 @@ impl Client {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.join_next().await {
result
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?
.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
while let Some(result) = tasks.next().await {
result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}

Expand Down
13 changes: 5 additions & 8 deletions autonomi/src/client/data_private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
use std::hash::{DefaultHasher, Hash, Hasher};

use bytes::Bytes;
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use sn_evm::{Amount, EvmWallet};
use sn_protocol::storage::Chunk;
use tokio::task::JoinSet;

use super::data::{GetError, PutError};
use crate::client::{ClientEvent, UploadSummary};
Expand Down Expand Up @@ -80,13 +80,13 @@ impl Client {
// Upload the chunks with the payments
let mut record_count = 0;
debug!("Uploading {} chunks", chunks.len());
let mut tasks = JoinSet::new();
let mut tasks = futures::stream::FuturesUnordered::new();
for chunk in chunks {
let self_clone = self.clone();
let address = *chunk.address();
if let Some(proof) = payment_proofs.get(chunk.name()) {
let proof_clone = proof.clone();
tasks.spawn(async move {
tasks.push(async move {
self_clone
.chunk_upload_with_payment(chunk, proof_clone)
.await
Expand All @@ -96,11 +96,8 @@ impl Client {
debug!("Chunk at {address:?} was already paid for so skipping");
}
}
while let Some(result) = tasks.join_next().await {
result
.inspect_err(|err| error!("Join error uploading chunk: {err:?}"))
.map_err(PutError::JoinError)?
.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
while let Some(result) = tasks.next().await {
result.inspect_err(|err| error!("Error uploading chunk: {err:?}"))?;
record_count += 1;
}

Expand Down

0 comments on commit 26bce27

Please sign in to comment.