Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Sep 19, 2023
1 parent 8a19231 commit a9f9932
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 47 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:

integration-test-groups:
name: Integration Tests (hybrid-net) group
runs-on: [goth, ubuntu-18.04]
runs-on: [goth, ubuntu-20.04]
needs: test_check
strategy:
matrix:
Expand All @@ -48,7 +48,7 @@ jobs:
- name: Configure Python
uses: actions/setup-python@v2
with:
python-version: "3.8.0"
python-version: "3.10.0"

- name: Configure Poetry
uses: Gr1N/setup-poetry@v8
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ ya-sb-util = { git = "https://github.com/golemfactory/ya-service-bus.git", rev =

## CLIENT
#ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "40895b19effc18b39736ad37faf90fdd1ebe7675" }
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "560ac2011b7f890d6051dc33d6e156d784b895b4" }

## RELAY and networking stack
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "c92a75b0cf062fcc9dbb3ea2a034d913e5fad8e5" }
Expand Down
18 changes: 8 additions & 10 deletions core/gftp/src/gftp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use futures::prelude::*;
use rand::distributions::Alphanumeric;
use rand::Rng;
use sha3::{Digest, Sha3_256};
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::str::FromStr;
Expand Down Expand Up @@ -195,7 +194,7 @@ pub async fn open_for_upload(filepath: &Path) -> Result<Url> {
}

async fn chunk_uploaded(
file: Arc<Mutex<File>>,
file: Arc<Mutex<fs::File>>,
msg: model::UploadChunk,
) -> Result<(), model::Error> {
let mut file = file.lock().await;
Expand All @@ -219,7 +218,7 @@ async fn chunk_uploaded(
}

async fn upload_finished(
file: Arc<Mutex<File>>,
file: Arc<Mutex<fs::File>>,
msg: model::UploadFinished,
) -> Result<(), model::Error> {
let mut file = file.lock().await;
Expand Down Expand Up @@ -268,15 +267,15 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> {
let remote = remote.clone();
async move {
let chunk = chunk?;
Ok::<_, anyhow::Error>(remote.call(model::UploadChunk { chunk }).await??)
Ok::<_, Error>(remote.call(model::UploadChunk { chunk }).await??)
}
})
.buffered(3)
.try_for_each(|_| future::ok(()))
.await?;

log::debug!("Computing file hash.");
let hash = hash_file_sha256(&mut File::open(path)?)?;
let hash = hash_file_sha256(&mut fs::File::open(path)?)?;

log::debug!("File [{}] has hash [{}].", path.display(), &hash);
remote
Expand All @@ -293,9 +292,8 @@ pub async fn upload_file(path: &Path, url: &Url) -> Result<()> {
fn get_chunks(
file_path: &Path,
chunk_size: u64,
) -> Result<impl Iterator<Item = Result<model::GftpChunk, std::io::Error>> + 'static, std::io::Error>
{
let mut file = OpenOptions::new().read(true).open(file_path)?;
) -> Result<impl Iterator<Item = Result<model::GftpChunk, io::Error>> + 'static, io::Error> {
let mut file = fs::OpenOptions::new().read(true).open(file_path)?;

let file_size = file.metadata()?.len();
let n_chunks = (file_size + chunk_size - 1) / chunk_size;
Expand Down Expand Up @@ -361,14 +359,14 @@ fn ensure_dir_exists(file_path: &Path) -> Result<()> {
Ok(())
}

fn create_dest_file(file_path: &Path) -> Result<File> {
fn create_dest_file(file_path: &Path) -> Result<fs::File> {
ensure_dir_exists(file_path).with_context(|| {
format!(
"Can't create destination directory for file: [{}].",
file_path.display()
)
})?;
OpenOptions::new()
fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
Expand Down
26 changes: 20 additions & 6 deletions exe-unit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::error::Error;
use crate::message::*;
use crate::runtime::*;
use crate::service::metrics::MetricsService;
use crate::service::transfer::{AddVolumes, DeployImage, TransferResource, TransferService, DeployImageUpdateDetails};
use crate::service::transfer::{
AddVolumes, DeployImage, DeployImageUpdateDetails, TransferResource, TransferService,
};
use crate::service::{ServiceAddr, ServiceControl};
use crate::state::{ExeUnitState, StateError, Supervision};

Expand Down Expand Up @@ -309,23 +311,35 @@ impl<R: Runtime> RuntimeRef<R> {
};
transfer_service.send(msg).await??;
}
ExeScriptCommand::Deploy { net, hosts, progress_update_interval } => {
ExeScriptCommand::Deploy {
net,
hosts,
progress,
hostname,
volumes,
env,
..
} => {
let msg = DeployImage {
update_details: progress_update_interval.map(|interval_string| {
DeployImageUpdateDetails {
update_details: progress
.as_ref()
.and_then(|progress| progress.update_interval)
.map(|interval_string| DeployImageUpdateDetails {
batch_id: runtime_cmd.batch_id.clone(),
idx: runtime_cmd.idx,
event_tx: runtime_cmd.tx.clone(),
interval: interval_string.into(),
}
})
}),
};
let task_package = transfer_service.send(msg).await??;
runtime
.send(UpdateDeployment {
task_package,
networks: Some(net.clone()),
hosts: Some(hosts.clone()),
hostname: hostname.clone(),
volumes: Some(volumes.clone()),
env: Some(env.clone()),
..Default::default()
})
.await??;
Expand Down
5 changes: 4 additions & 1 deletion exe-unit/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ mod tests {
ExeScriptCommand::Deploy {
net: Default::default(),
hosts: Default::default(),
progress_update_interval: Default::default(),
hostname: None,
volumes: vec![],
env: Default::default(),
progress: Default::default(),
},
ExeScriptCommand::Start {
args: Default::default(),
Expand Down
5 changes: 4 additions & 1 deletion exe-unit/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use crate::Result;
use actix::prelude::*;
use futures::channel::mpsc;
use serde::{Deserialize, Serialize};
use ya_client_model::activity::runtime_event::DeployProgress;
use std::collections::HashMap;
use std::path::PathBuf;
use ya_client_model::activity;
use ya_client_model::activity::activity_state::{State, StatePair};
use ya_client_model::activity::exe_script_command::Network;
use ya_client_model::activity::runtime_event::DeployProgress;
use ya_client_model::activity::{CommandOutput, ExeScriptCommand, ExeScriptCommandResult};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Message)]
Expand Down Expand Up @@ -176,6 +176,9 @@ pub struct UpdateDeployment {
pub runtime_mode: Option<RuntimeMode>,
pub networks: Option<Vec<Network>>,
pub hosts: Option<HashMap<String, String>>,
pub hostname: Option<String>,
pub volumes: Option<Vec<String>>,
pub env: Option<HashMap<String, String>>,
}

#[derive(Clone, Debug, Message)]
Expand Down
9 changes: 9 additions & 0 deletions exe-unit/src/runtime/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,15 @@ impl Handler<UpdateDeployment> for RuntimeProcess {
if let Some(hosts) = msg.hosts {
self.deployment.hosts.extend(hosts.into_iter());
}
if let Some(vols) = msg.volumes {
self.deployment.volumes = vols;
}
if let Some(env) = msg.env {
self.deployment.env = env;
}
if let Some(hostname) = msg.hostname {
self.deployment.hostname = Some(hostname);
}
Ok(())
}
}
Expand Down
73 changes: 52 additions & 21 deletions exe-unit/src/service/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use std::collections::{HashMap, HashSet};
use std::io;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::{Mutex, Arc};
use std::sync::{Arc, Mutex};

Check failure on line 6 in exe-unit/src/service/transfer.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused imports: `Arc`, `Mutex`
use std::time::Duration;

use actix::prelude::*;
use futures::SinkExt;
use futures::future::Abortable;
use futures::SinkExt;

Check failure on line 11 in exe-unit/src/service/transfer.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `futures::SinkExt`
use url::Url;
use ya_client_model::activity::runtime_event::DeployProgress;

Check failure on line 13 in exe-unit/src/service/transfer.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `ya_client_model::activity::runtime_event::DeployProgress`

use crate::deploy::ContainerVolume;
use crate::error::Error;
use crate::message::{Shutdown, RuntimeEvent};
use crate::message::{RuntimeEvent, Shutdown};

Check failure on line 17 in exe-unit/src/service/transfer.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `RuntimeEvent`
use crate::util::cache::Cache;
use crate::util::Abort;
use crate::{ExeUnitContext, Result};
Expand Down Expand Up @@ -279,7 +279,11 @@ impl Handler<DeployImage> for TransferService {
if path.exists() {
log::info!("Deploying cached image: {:?}", path);
if let Some(update_details) = cmd.update_details.as_mut() {
let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, DeployProgress::DeployFromCache);
let event = RuntimeEvent::deploy_progress(
update_details.batch_id.clone(),
update_details.idx,
DeployProgress::DeployFromCache,
);
let _ = update_details.event_tx.send(event).await;
}
return Ok(Some(path));
Expand All @@ -306,7 +310,11 @@ impl Handler<DeployImage> for TransferService {
Err(_) => None,
};
let progress = DeployProgress::DownloadProgress(progress, total);
let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress);
let event = RuntimeEvent::deploy_progress(
update_details.batch_id.clone(),
update_details.idx,
progress,
);
let _ = update_details.event_tx.send(event).await;
}
tokio::time::sleep(update_details.interval).await;
Expand All @@ -317,36 +325,54 @@ impl Handler<DeployImage> for TransferService {
let (abort, reg) = Abort::new_pair();
{
let ctx = Default::default();
let report_progress = cmd.update_details.as_ref().map(|_| move |progress: u64, total: Option<u64>| {
if let Some(progress_container) = progress_update.upgrade() {
let mut progress_container = progress_container.lock().unwrap();
let _ = progress_container.insert(progress);
}
if let Some(size) = total {
if let Some(total_container) = total_update.upgrade() {
let mut total_container = total_container.lock().unwrap();
let _ = total_container.insert(size);
let report_progress = cmd.update_details.as_ref().map(|_| {
move |progress: u64, total: Option<u64>| {
if let Some(progress_container) = progress_update.upgrade() {
let mut progress_container = progress_container.lock().unwrap();
let _ = progress_container.insert(progress);
}
if let Some(size) = total {
if let Some(total_container) = total_update.upgrade() {
let mut total_container = total_container.lock().unwrap();
let _ = total_container.insert(size);
}
}
}
});
let report_retry = cmd.update_details.clone().map(|mut details|
let report_retry = cmd.update_details.clone().map(|mut details| {
move |err: ya_transfer::error::Error, delay: Duration| {
if let Some(progress_container) = retry_progress.upgrade() {
let mut progress_container = progress_container.lock().unwrap();
let _ = progress_container.insert(0);
}
let progress = DeployProgress::DownloadRetry(err.to_string(), delay);
let event = RuntimeEvent::deploy_progress(details.batch_id.clone(), details.idx, progress);
let event = RuntimeEvent::deploy_progress(
details.batch_id.clone(),
details.idx,
progress,
);
let _ = futures::executor::block_on(details.event_tx.send(event));
}
);
});
if let Some(update_details) = cmd.update_details.as_mut() {
let progress = DeployProgress::DownloadingImage;
let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress);
let event = RuntimeEvent::deploy_progress(
update_details.batch_id.clone(),
update_details.idx,
progress,
);
let _ = update_details.event_tx.send(event).await;
}

let retry = transfer_with_progress_report(src, &src_url, dst, &dst_url, &ctx, report_progress, report_retry);
let retry = transfer_with_progress_report(
src,
&src_url,
dst,
&dst_url,
&ctx,
report_progress,
report_retry,
);

let _guard = AbortHandleGuard::register(handles, abort);
Ok::<_, Error>(
Expand All @@ -362,10 +388,15 @@ impl Handler<DeployImage> for TransferService {
.map(|_| async {
if let Some(update_details) = cmd.update_details.as_mut() {
let progress = DeployProgress::DownloadFinished;
let event = RuntimeEvent::deploy_progress(update_details.batch_id.clone(), update_details.idx, progress);
let event = RuntimeEvent::deploy_progress(
update_details.batch_id.clone(),
update_details.idx,
progress,
);
let _ = update_details.event_tx.send(event).await;
}
})?.await,
})?
.await,
)
}?;

Expand Down
13 changes: 10 additions & 3 deletions exe-unit/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tokio::sync::broadcast;

pub use ya_client_model::activity::activity_state::{State, StatePair};
use ya_client_model::activity::exe_script_command::Network;
use ya_client_model::activity::*;
use ya_client_model::activity::runtime_event::DeployProgress;
use ya_client_model::activity::*;
use ya_core_model::activity::Exec;
use ya_utils_networking::vpn::common::{to_ip, to_net};
use ya_utils_networking::vpn::Error as NetError;
Expand Down Expand Up @@ -169,9 +169,13 @@ impl Batch {
let message = match progress {
DeployProgress::DeployFromCache => "Deploying from cached image".to_string(),
DeployProgress::DownloadingImage => "Download image to deploy".to_string(),
DeployProgress::DownloadProgress(progress, total) => format!("Image download progress {progress} b / {total:?} b"),
DeployProgress::DownloadProgress(progress, total) => {
format!("Image download progress {progress} b / {total:?} b")
}
DeployProgress::DownloadError(err) => format!("Image download failed {err}"),
DeployProgress::DownloadRetry(err, delay) => format!("Image download failed {err} will be retried in {delay:?}"),
DeployProgress::DownloadRetry(err, delay) => {
format!("Image download failed {err} will be retried in {delay:?}")
}
DeployProgress::DownloadFinished => "Image download finished".to_string(),
};
let state = self.state(idx)?;
Expand Down Expand Up @@ -400,6 +404,9 @@ pub(crate) struct Deployment {
pub task_package: Option<PathBuf>,
pub networks: HashMap<String, DeploymentNetwork>,
pub hosts: HashMap<String, String>,
pub env: HashMap<String, String>,
pub hostname: Option<String>,
pub volumes: Vec<String>,
}

#[derive(Clone, Debug)]
Expand Down
Loading

0 comments on commit a9f9932

Please sign in to comment.