Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Sep 28, 2023
1 parent 7c52fce commit e804063
Show file tree
Hide file tree
Showing 3 changed files with 407 additions and 346 deletions.
352 changes: 6 additions & 346 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,362 +1,22 @@
#[macro_use]
extern crate log;
use anyhow::Context;
use fungi::meta::Ino;
use fungi::Writer;
use nix::unistd::{fchownat, FchownatFlags, Gid, Uid};
use std::collections::LinkedList;
use std::ffi::OsString;
use std::fs::Metadata;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{ffi::OsStr, fs};
use store::Store;
use workers::WorkerPool;

const BLOB_SIZE: usize = 512 * 1024; // 512K
const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel

pub mod cache;
pub mod fungi;
pub mod store;

use cache::Cache;
use fungi::{
meta::{FileType, Inode, Result, Walk, WalkVisitor},
Reader,
};

use crate::store::BlockStore;

struct CopyVisitor<'a, S>
where
S: store::Store,
{
preserve: bool,
meta: &'a fungi::Reader,
cache: &'a cache::Cache<S>,
root: &'a Path,
}

impl<'a, S> CopyVisitor<'a, S>
where
S: store::Store,
{
pub fn new(
meta: &'a fungi::Reader,
cache: &'a Cache<S>,
root: &'a Path,
preserve: bool,
) -> Self {
Self {
meta,
cache,
root,
preserve,
}
}
}

#[async_trait::async_trait]
impl<'a, S> WalkVisitor for CopyVisitor<'a, S>
where
S: Store,
{
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk> {
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
use tokio::fs::OpenOptions;

let rooted = self.root.join(path.strip_prefix("/").unwrap());

match node.mode.file_type() {
FileType::Dir => {
fs::create_dir_all(&rooted)
.with_context(|| format!("failed to create directory '{:?}'", rooted))?;
}
FileType::Regular => {
let mut fd = OpenOptions::new()
.create_new(true)
.write(true)
.truncate(true)
.open(&rooted)
.await
.with_context(|| format!("failed to create file '{:?}'", rooted))?;

let blocks = self.meta.blocks(node.ino).await?;
self.cache
.direct(&blocks, &mut fd)
.await
.with_context(|| format!("failed to download file '{:?}'", rooted))?;

fd.set_permissions(Permissions::from_mode(node.mode.mode()))
.await?;
}
FileType::Link => {
let target = node
.data
.as_deref()
.ok_or_else(|| anyhow::anyhow!("link has no target path"))?;

let target = Path::new(OsStr::from_bytes(target));
let target = if target.is_relative() {
target.to_owned()
} else {
self.root.join(target)
};

std::os::unix::fs::symlink(target, &rooted)
.with_context(|| format!("failed to create symlink '{:?}'", rooted))?;
}
_ => {
warn!("unknown file kind: {:?}", node.mode.file_type());
return Ok(Walk::Continue);
}
};

if self.preserve {
fchownat(
None,
&rooted,
Some(Uid::from_raw(node.uid)),
Some(Gid::from_raw(node.gid)),
FchownatFlags::NoFollowSymlink,
)
.with_context(|| format!("failed to change ownership of '{:?}'", &rooted))?;
}

Ok(Walk::Continue)
}
}

/// unpack an FL to the given root location. it will download the files and reconstruct
/// the filesystem.
pub async fn unpack<P: AsRef<Path>, S: Store>(
meta: &Reader,
cache: &Cache<S>,
root: P,
preserve: bool,
) -> Result<()> {
let mut visitor = CopyVisitor::new(meta, cache, root.as_ref(), preserve);

meta.walk(&mut visitor).await
}

#[derive(Debug)]
struct Item(Ino, PathBuf, OsString, Metadata);
/// creates an FL from the given root location. It takes ownership of the writer because
/// it's logically incorrect to store multiple filessytem in the same FL.
/// All file chunks will then be uploaded to the provided store
///
pub async fn pack<P: Into<PathBuf>, S: Store>(writer: Writer, store: S, root: P) -> Result<()> {
use tokio::fs;

// building routing table from store information
for route in store.routes() {
writer
.route(
route.start.unwrap_or(u8::MIN),
route.end.unwrap_or(u8::MAX),
route.url,
)
.await?;
}

let store: BlockStore<S> = store.into();

let root = root.into();
let meta = fs::metadata(&root)
.await
.context("failed to get root stats")?;

let mut list = LinkedList::default();

let uploader = Uploader::new(store, writer.clone());
let mut pool = workers::WorkerPool::new(uploader.clone(), PARALLEL_UPLOAD);

pack_one(
&mut list,
&writer,
&mut pool,
Item(0, root, OsString::from("/"), meta),
)
.await?;

while !list.is_empty() {
let dir = list.pop_back().unwrap();
pack_one(&mut list, &writer, &mut pool, dir).await?;
}

pool.close().await;
Ok(())
}

/// pack_one is called for each dir
async fn pack_one<S: Store>(
list: &mut LinkedList<Item>,
writer: &Writer,
pool: &mut WorkerPool<Uploader<S>>,
Item(parent, path, name, meta): Item,
) -> Result<()> {
use std::os::unix::fs::MetadataExt;
use tokio::fs;
mod pack;
pub use pack::pack;
mod unpack;
pub use unpack::unpack;

let current = writer
.inode(Inode {
ino: 0,
name: String::from_utf8_lossy(name.as_bytes()).into_owned(),
parent,
size: meta.size(),
uid: meta.uid(),
gid: meta.gid(),
mode: meta.mode().into(),
rdev: meta.rdev(),
ctime: meta.ctime(),
mtime: meta.mtime(),
data: None,
})
.await?;

let mut children = fs::read_dir(&path)
.await
.context("failed to list dir children")?;

while let Some(child) = children
.next_entry()
.await
.context("failed to read next entry from directory")?
{
let name = child.file_name();
let meta = child.metadata().await?;
let child_path = path.join(&name);

// if this child a directory we add to the tail of the list
if meta.is_dir() {
list.push_back(Item(current, child_path.clone(), name, meta));
continue;
}

// create entry
// otherwise create the file meta
let data = if meta.is_symlink() {
let target = fs::read_link(&child_path).await?;
Some(target.as_os_str().as_bytes().into())
} else {
None
};

let child_ino = writer
.inode(Inode {
ino: 0,
name: String::from_utf8_lossy(name.as_bytes()).into_owned(),
parent: current,
size: meta.size(),
uid: meta.uid(),
gid: meta.gid(),
mode: meta.mode().into(),
rdev: meta.rdev(),
ctime: meta.ctime(),
mtime: meta.mtime(),
data,
})
.await?;

if !meta.is_file() {
continue;
}

let worker = pool.get().await;
worker
.send((child_ino, child_path))
.context("failed to schedule file upload")?;
}
Ok(())
}

struct Uploader<S>
where
S: Store,
{
store: Arc<BlockStore<S>>,
writer: Writer,
buffer: [u8; BLOB_SIZE],
}

impl<S> Clone for Uploader<S>
where
S: Store,
{
fn clone(&self) -> Self {
Self {
store: Arc::clone(&self.store),
writer: self.writer.clone(),
buffer: [0; BLOB_SIZE],
}
}
}

impl<S> Uploader<S>
where
S: Store,
{
fn new(store: BlockStore<S>, writer: Writer) -> Self {
Self {
store: Arc::new(store),
writer,
buffer: [0; BLOB_SIZE],
}
}

async fn upload(&mut self, ino: Ino, path: &Path) -> Result<()> {
use tokio::fs;
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;

// create file blocks
let fd = fs::OpenOptions::default().read(true).open(path).await?;

let mut reader = BufReader::new(fd);
loop {
let size = reader.read(&mut self.buffer).await?;
if size == 0 {
break;
}

// write block to remote store
let block = self
.store
.set(&self.buffer[..size])
.await
.context("failed to store blob")?;

// write block info to meta
self.writer.block(ino, &block.id, &block.key).await?;
}

Ok(())
}
}

#[async_trait::async_trait]
impl<S> workers::Work for Uploader<S>
where
S: Store,
{
type Input = (Ino, PathBuf);
type Output = ();

async fn run(&mut self, (ino, path): Self::Input) -> Self::Output {
if let Err(err) = self.upload(ino, &path).await {
log::error!("failed to upload file ({:?}): {}", path, err);
}
}
}
const PARALLEL_UPLOAD: usize = 10; // number of files we can upload in parallel

#[cfg(test)]
mod test {
use super::*;
use crate::{
cache::Cache,
fungi::meta,
store::{dir::DirStore, Router},
};
Expand Down
Loading

0 comments on commit e804063

Please sign in to comment.