Skip to content

Commit

Permalink
chore: move copy to dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 15, 2024
1 parent 91818fe commit a86969d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 50 deletions.
47 changes: 45 additions & 2 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::pin::Pin;
use std::{cmp, pin::Pin, sync::Arc};

use futures_core::Stream;

use super::MaybeSendFuture;
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write,
};
Expand Down Expand Up @@ -48,6 +48,8 @@ impl<'write> Write for Box<dyn DynFile + 'write> {
}

pub trait DynFs: MaybeSend + MaybeSync {
fn file_system(&self) -> FileSystemTag;

fn open<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -99,6 +101,10 @@ pub trait DynFs: MaybeSend + MaybeSync {
}

impl<F: Fs> DynFs for F {
fn file_system(&self) -> FileSystemTag {
Fs::file_system(self)
}

fn open_options<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -160,6 +166,43 @@ impl<F: Fs> DynFs for F {
}
}

pub async fn copy(
from_fs: &Arc<dyn DynFs>,
from: &Path,
to_fs: &Arc<dyn DynFs>,
to: &Path,
) -> Result<(), Error> {
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = DynRead::size(&from_file).await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await;
result?;
read_pos += tmp.bytes_init() as u64;

let (result, tmp) = Write::write_all(&mut to_file, tmp).await;
result?;
buf = Some(tmp);
}
DynWrite::close(&mut to_file).await?;

Ok(())
}

#[cfg(test)]
mod tests {

Expand Down
63 changes: 17 additions & 46 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod options;

use std::{cmp, future::Future};
use std::future::Future;

use futures_core::Stream;
pub use options::*;
Expand Down Expand Up @@ -55,44 +55,9 @@ pub trait Fs: MaybeSend + MaybeSync {
fn link(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}

pub async fn copy<F, T>(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error>
where
F: Fs,
T: Fs,
{
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = from_file.size().await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await;
result?;
read_pos += tmp.len() as u64;

let (result, tmp) = to_file.write_all(tmp).await;
result?;
buf = Some(tmp);
}
to_file.close().await?;

Ok(())
}

#[cfg(test)]
mod tests {
use crate::DynFs;

#[ignore]
#[cfg(all(
Expand All @@ -108,13 +73,12 @@ mod tests {
use tempfile::TempDir;

use crate::{
fs,
fs::{Fs, OpenOptions},
impls::disk::tokio::fs::TokioFs,
path::Path,
remotes::{
aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File},
http::{tokio::TokioClient, DynHttpClient, HttpClient},
http::tokio::TokioClient,
},
Read, Write,
};
Expand All @@ -141,22 +105,29 @@ mod tests {
checksum: false,
};

let s3_fs = AmazonS3::new(Box::new(client), options);
let local_fs = TokioFs;
let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options));
let local_fs = Arc::new(TokioFs);

{
let mut local_file = local_fs
.open_options(&local_path, OpenOptions::default().create(true).write(true))
.await?;
let mut local_file = Fs::open_options(
local_fs.as_ref(),
&local_path,
OpenOptions::default().create(true).write(true),
)
.await?;
local_file
.write_all("🎵never gonna give you up🎵".as_bytes())
.await
.0?;
local_file.close().await.unwrap();
}
fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;
{
let s3_fs = s3_fs.clone() as Arc<dyn DynFs>;
let local_fs = local_fs.clone() as Arc<dyn DynFs>;
crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;
}

let mut s3 = S3File::new(s3_fs, s3_path.clone());
let mut s3 = S3File::new(Arc::into_inner(s3_fs).unwrap(), s3_path.clone());

let size = s3.size().await.unwrap();
assert_eq!(size, 31);
Expand Down
4 changes: 2 additions & 2 deletions fusio/src/impls/disk/monoio/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Fs for MonoIoFs {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

let _ = monoio::spawn(async move { fs::copy(&from, &to) }).await?;
monoio::spawn(async move { fs::copy(&from, &to) }).await?;

Ok(())
}
Expand All @@ -74,7 +74,7 @@ impl Fs for MonoIoFs {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

let _ = monoio::spawn(async move { fs::hard_link(&from, &to) }).await?;
monoio::spawn(async move { fs::hard_link(&from, &to) }).await?;

Ok(())
}
Expand Down

0 comments on commit a86969d

Please sign in to comment.