From 1c123dc2744a6fb6932239e841d942a8b41283a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Krzysztof=20Rekucki?= Date: Tue, 19 Sep 2023 16:56:56 +0200 Subject: [PATCH] fmt --- .github/workflows/integration-test.yml | 4 +- Cargo.lock | 2 +- Cargo.toml | 2 +- core/gftp/src/gftp.rs | 18 +++---- exe-unit/src/lib.rs | 26 ++++++--- exe-unit/src/manifest.rs | 6 ++- exe-unit/src/message.rs | 5 +- exe-unit/src/runtime/process.rs | 9 ++++ exe-unit/src/service/transfer.rs | 73 ++++++++++++++++++-------- exe-unit/src/state.rs | 13 +++-- utils/transfer/src/lib.rs | 11 +++- 11 files changed, 122 insertions(+), 47 deletions(-) diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 5324f4f34c..8e9a2c760d 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -30,7 +30,7 @@ jobs: integration-test-groups: name: Integration Tests (hybrid-net) group - runs-on: [goth, ubuntu-18.04] + runs-on: [goth, ubuntu-20.04] needs: test_check strategy: matrix: @@ -48,7 +48,7 @@ jobs: - name: Configure Python uses: actions/setup-python@v2 with: - python-version: "3.8.0" + python-version: "3.10.0" - name: Configure Poetry uses: Gr1N/setup-poetry@v8 diff --git a/Cargo.lock b/Cargo.lock index 88d98580e3..44e6a5b482 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7900,7 +7900,7 @@ dependencies = [ [[package]] name = "ya-client-model" version = "0.5.0" -source = "git+https://github.com/golemfactory/ya-client.git?rev=40895b19effc18b39736ad37faf90fdd1ebe7675#40895b19effc18b39736ad37faf90fdd1ebe7675" +source = "git+https://github.com/golemfactory/ya-client.git?rev=560ac2011b7f890d6051dc33d6e156d784b895b4#560ac2011b7f890d6051dc33d6e156d784b895b4" dependencies = [ "bigdecimal 0.2.2", "chrono", diff --git a/Cargo.toml b/Cargo.toml index cbd42c3836..ba0708630a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -261,7 +261,7 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev = ## CLIENT #ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" } -ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "40895b19effc18b39736ad37faf90fdd1ebe7675" } +ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "560ac2011b7f890d6051dc33d6e156d784b895b4" } ## RELAY and networking stack ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" } diff --git a/core/gftp/src/gftp.rs b/core/gftp/src/gftp.rs index f881df924e..0330b589a2 100644 --- a/core/gftp/src/gftp.rs +++ b/core/gftp/src/gftp.rs @@ -4,7 +4,6 @@ use futures::prelude::*; use rand::distributions::Alphanumeric; use rand::Rng; use sha3::{Digest, Sha3_256}; -use std::fs::{File, OpenOptions}; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::Path; use std::str::FromStr; @@ -195,7 +194,7 @@ pub async fn open_for_upload(filepath: &Path) -> Result { } async fn chunk_uploaded( - file: Arc>, + file: Arc>, msg: model::UploadChunk, ) -> Result<(), model::Error> { let mut file = file.lock().await; @@ -219,7 +218,7 @@ async fn chunk_uploaded( } async fn upload_finished( - file: Arc>, + file: Arc>, msg: model::UploadFinished, ) -> Result<(), model::Error> { let mut file = file.lock().await; @@ -268,7 +267,7 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> { let remote = remote.clone(); async move { let chunk = chunk?; - Ok::<_, anyhow::Error>(remote.call(model::UploadChunk { chunk }).await??) + Ok::<_, Error>(remote.call(model::UploadChunk { chunk }).await??) } }) .buffered(3) @@ -276,7 +275,7 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> { .await?; log::debug!("Computing file hash."); - let hash = hash_file_sha256(&mut File::open(path)?)?; + let hash = hash_file_sha256(&mut fs::File::open(path)?)?; log::debug!("File [{}] has hash [{}].", path.display(), &hash); remote @@ -293,9 +292,8 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> { fn get_chunks( file_path: &Path, chunk_size: u64, -) -> Result> + 'static, std::io::Error> -{ - let mut file = OpenOptions::new().read(true).open(file_path)?; +) -> Result> + 'static, io::Error> { + let mut file = fs::OpenOptions::new().read(true).open(file_path)?; let file_size = file.metadata()?.len(); let n_chunks = (file_size + chunk_size - 1) / chunk_size; @@ -361,14 +359,14 @@ fn ensure_dir_exists(file_path: &Path) -> Result<()> { Ok(()) } -fn create_dest_file(file_path: &Path) -> Result { +fn create_dest_file(file_path: &Path) -> Result { ensure_dir_exists(file_path).with_context(|| { format!( "Can't create destination directory for file: [{}].", file_path.display() ) })?; - OpenOptions::new() + fs::OpenOptions::new() .read(true) .write(true) .create(true) diff --git a/exe-unit/src/lib.rs b/exe-unit/src/lib.rs index 4b96d128a4..45f998ba72 100644 --- a/exe-unit/src/lib.rs +++ b/exe-unit/src/lib.rs @@ -24,7 +24,9 @@ use crate::error::Error; use crate::message::*; use crate::runtime::*; use crate::service::metrics::MetricsService; -use crate::service::transfer::{AddVolumes, DeployImage, TransferResource, TransferService, DeployImageUpdateDetails}; +use crate::service::transfer::{ + AddVolumes, DeployImage, DeployImageUpdateDetails, TransferResource, TransferService, +}; use crate::service::{ServiceAddr, ServiceControl}; use crate::state::{ExeUnitState, StateError, Supervision}; @@ -309,16 +311,25 @@ impl RuntimeRef { }; transfer_service.send(msg).await??; } - ExeScriptCommand::Deploy { net, hosts, progress_update_interval } => { + ExeScriptCommand::Deploy { + net, + hosts, + progress, + hostname, + volumes, + env, + .. + } => { let msg = DeployImage { - update_details: progress_update_interval.map(|interval_string| { - DeployImageUpdateDetails { + update_details: progress + .as_ref() + .and_then(|progress| progress.update_interval) + .map(|interval_string| DeployImageUpdateDetails { batch_id: runtime_cmd.batch_id.clone(), idx: runtime_cmd.idx, event_tx: runtime_cmd.tx.clone(), interval: interval_string.into(), - } - }) + }), }; let task_package = transfer_service.send(msg).await??; runtime @@ -326,6 +337,9 @@ impl RuntimeRef { task_package, networks: Some(net.clone()), hosts: Some(hosts.clone()), + hostname: hostname.clone(), + volumes: Some(volumes.clone()), + env: Some(env.clone()), ..Default::default() }) .await??; diff --git a/exe-unit/src/manifest.rs b/exe-unit/src/manifest.rs index 437e9b8e36..b64172cfbe 100644 --- a/exe-unit/src/manifest.rs +++ b/exe-unit/src/manifest.rs @@ -510,7 +510,11 @@ mod tests { ExeScriptCommand::Deploy { net: Default::default(), hosts: Default::default(), - progress_update_interval: Default::default(), + hostname: None, + volumes: vec![], + env: Default::default(), + progress: Default::default() + }, ExeScriptCommand::Start { args: Default::default(), diff --git a/exe-unit/src/message.rs b/exe-unit/src/message.rs index 85ceda6d1a..52908b2819 100644 --- a/exe-unit/src/message.rs +++ b/exe-unit/src/message.rs @@ -5,12 +5,12 @@ use crate::Result; use actix::prelude::*; use futures::channel::mpsc; use serde::{Deserialize, Serialize}; -use ya_client_model::activity::runtime_event::DeployProgress; use std::collections::HashMap; use std::path::PathBuf; use ya_client_model::activity; use ya_client_model::activity::activity_state::{State, StatePair}; use ya_client_model::activity::exe_script_command::Network; +use ya_client_model::activity::runtime_event::DeployProgress; use ya_client_model::activity::{CommandOutput, ExeScriptCommand, ExeScriptCommandResult}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Message)] @@ -176,6 +176,9 @@ pub struct UpdateDeployment { pub runtime_mode: Option, pub networks: Option>, pub hosts: Option>, + pub hostname: Option, + pub volumes: Option>, + pub env: Option>, } #[derive(Clone, Debug, Message)] diff --git a/exe-unit/src/runtime/process.rs b/exe-unit/src/runtime/process.rs index 5025fdeff2..8bfb3c95c9 100644 --- a/exe-unit/src/runtime/process.rs +++ b/exe-unit/src/runtime/process.rs @@ -456,6 +456,15 @@ impl Handler for RuntimeProcess { if let Some(hosts) = msg.hosts { self.deployment.hosts.extend(hosts.into_iter()); } + if let Some(vols) = msg.volumes { + self.deployment.volumes = vols; + } + if let Some(env) = msg.env { + self.deployment.env = env; + } + if let Some(hostname) = msg.hostname { + self.deployment.hostname = Some(hostname); + } Ok(()) } } diff --git a/exe-unit/src/service/transfer.rs b/exe-unit/src/service/transfer.rs index add8083b58..0227d85432 100644 --- a/exe-unit/src/service/transfer.rs +++ b/exe-unit/src/service/transfer.rs @@ -3,18 +3,18 @@ use std::collections::{HashMap, HashSet}; use std::io; use std::path::{Path, PathBuf}; use std::rc::Rc; -use std::sync::{Mutex, Arc}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use actix::prelude::*; -use futures::SinkExt; use futures::future::Abortable; +use futures::SinkExt; use url::Url; use ya_client_model::activity::runtime_event::DeployProgress; use crate::deploy::ContainerVolume; use crate::error::Error; -use crate::message::{Shutdown, RuntimeEvent}; +use crate::message::{RuntimeEvent, Shutdown}; use crate::util::cache::Cache; use crate::util::Abort; use crate::{ExeUnitContext, Result}; @@ -279,7 +279,11 @@ impl Handler for TransferService { if path.exists() { log::info!("Deploying cached image: {:?}", path); if let Some(update_details) = cmd.update_details.as_mut() { - let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, DeployProgress::DeployFromCache); + let event = RuntimeEvent::deploy_progress( + update_details.batch_id.clone(), + update_details.idx, + DeployProgress::DeployFromCache, + ); let _ = update_details.event_tx.send(event).await; } return Ok(Some(path)); @@ -306,7 +310,11 @@ impl Handler for TransferService { Err(_) => None, }; let progress = DeployProgress::DownloadProgress(progress, total); - let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress); + let event = RuntimeEvent::deploy_progress( + update_details.batch_id.clone(), + update_details.idx, + progress, + ); let _ = update_details.event_tx.send(event).await; } tokio::time::sleep(update_details.interval).await; @@ -317,36 +325,54 @@ impl Handler for TransferService { let (abort, reg) = Abort::new_pair(); { let ctx = Default::default(); - let report_progress = cmd.update_details.as_ref().map(|_| move |progress: u64, total: Option| { - if let Some(progress_container) = progress_update.upgrade() { - let mut progress_container = progress_container.lock().unwrap(); - let _ = progress_container.insert(progress); - } - if let Some(size) = total { - if let Some(total_container) = total_update.upgrade() { - let mut total_container = total_container.lock().unwrap(); - let _ = total_container.insert(size); + let report_progress = cmd.update_details.as_ref().map(|_| { + move |progress: u64, total: Option| { + if let Some(progress_container) = progress_update.upgrade() { + let mut progress_container = progress_container.lock().unwrap(); + let _ = progress_container.insert(progress); + } + if let Some(size) = total { + if let Some(total_container) = total_update.upgrade() { + let mut total_container = total_container.lock().unwrap(); + let _ = total_container.insert(size); + } } } }); - let report_retry = cmd.update_details.clone().map(|mut details| + let report_retry = cmd.update_details.clone().map(|mut details| { move |err: ya_transfer::error::Error, delay: Duration| { if let Some(progress_container) = retry_progress.upgrade() { let mut progress_container = progress_container.lock().unwrap(); let _ = progress_container.insert(0); } let progress = DeployProgress::DownloadRetry(err.to_string(), delay); - let event = RuntimeEvent::deploy_progress(details.batch_id.clone(), details.idx, progress); + let event = RuntimeEvent::deploy_progress( + details.batch_id.clone(), + details.idx, + progress, + ); let _ = futures::executor::block_on(details.event_tx.send(event)); } - ); + }); if let Some(update_details) = cmd.update_details.as_mut() { let progress = DeployProgress::DownloadingImage; - let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress); + let event = RuntimeEvent::deploy_progress( + update_details.batch_id.clone(), + update_details.idx, + progress, + ); let _ = update_details.event_tx.send(event).await; } - let retry = transfer_with_progress_report(src, &src_url, dst, &dst_url, &ctx, report_progress, report_retry); + let retry = transfer_with_progress_report( + src, + &src_url, + dst, + &dst_url, + &ctx, + report_progress, + report_retry, + ); let _guard = AbortHandleGuard::register(handles, abort); Ok::<_, Error>( @@ -362,10 +388,15 @@ impl Handler for TransferService { .map(|_| async { if let Some(update_details) = cmd.update_details.as_mut() { let progress = DeployProgress::DownloadFinished; - let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress); + let event = RuntimeEvent::deploy_progress( + update_details.batch_id.clone(), + update_details.idx, + progress, + ); let _ = update_details.event_tx.send(event).await; } - })?.await, + })? + .await, ) }?; diff --git a/exe-unit/src/state.rs b/exe-unit/src/state.rs index fe71ee4ba7..0e59991c6e 100644 --- a/exe-unit/src/state.rs +++ b/exe-unit/src/state.rs @@ -13,8 +13,8 @@ use tokio::sync::broadcast; pub use ya_client_model::activity::activity_state::{State, StatePair}; use ya_client_model::activity::exe_script_command::Network; -use ya_client_model::activity::*; use ya_client_model::activity::runtime_event::DeployProgress; +use ya_client_model::activity::*; use ya_core_model::activity::Exec; use ya_utils_networking::vpn::common::{to_ip, to_net}; use ya_utils_networking::vpn::Error as NetError; @@ -169,9 +169,13 @@ impl Batch { let message = match progress { DeployProgress::DeployFromCache => "Deploying from cached image".to_string(), DeployProgress::DownloadingImage => "Download image to deploy".to_string(), - DeployProgress::DownloadProgress(progress, total) => format!("Image download progress {progress} b / {total:?} b"), + DeployProgress::DownloadProgress(progress, total) => { + format!("Image download progress {progress} b / {total:?} b") + } DeployProgress::DownloadError(err) => format!("Image download failed {err}"), - DeployProgress::DownloadRetry(err, delay) => format!("Image download failed {err} will be retried in {delay:?}"), + DeployProgress::DownloadRetry(err, delay) => { + format!("Image download failed {err} will be retried in {delay:?}") + } DeployProgress::DownloadFinished => "Image download finished".to_string(), }; let state = self.state(idx)?; @@ -400,6 +404,9 @@ pub(crate) struct Deployment { pub task_package: Option, pub networks: HashMap, pub hosts: HashMap, + pub env: HashMap, + pub hostname: Option, + pub volumes: Vec, } #[derive(Clone, Debug)] diff --git a/utils/transfer/src/lib.rs b/utils/transfer/src/lib.rs index c9a346c54b..316fb504c7 100644 --- a/utils/transfer/src/lib.rs +++ b/utils/transfer/src/lib.rs @@ -58,7 +58,16 @@ where S: TransferProvider + ?Sized, D: TransferProvider + ?Sized, { - transfer_with_progress_report(src, src_url, dst, dst_url, ctx, None::)>, None::).await + transfer_with_progress_report( + src, + src_url, + dst, + dst_url, + ctx, + None::)>, + None::, + ) + .await } pub async fn transfer_with_progress_report(