Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize blobfs implementation #1295

Merged
merged 3 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 53 additions & 50 deletions rafs/src/blobfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//! with heavy modification/enhancements from Alibaba Cloud OS team.

use std::any::Any;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::fs::{create_dir_all, File};
use std::io;
Expand All @@ -20,7 +21,7 @@ use std::os::fd::{AsRawFd, FromRawFd};
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;

use fuse_backend_rs::api::{filesystem::*, BackendFileSystem, VFS_MAX_INO};
Expand Down Expand Up @@ -82,15 +83,16 @@ struct RafsHandle {
thread: Option<thread::JoinHandle<Result<Rafs, RafsError>>>,
}

struct BootstrapArgs {
struct BlobfsState {
#[allow(unused)]
blob_cache_dir: String,
rafs_handle: Mutex<RafsHandle>,
rafs_handle: RwLock<RafsHandle>,
inode_map: Mutex<HashMap<Inode, (u64, String)>>,
}

impl BootstrapArgs {
impl BlobfsState {
fn get_rafs_handle(&self) -> io::Result<()> {
let mut rafs_handle = self.rafs_handle.lock().unwrap();
let mut rafs_handle = self.rafs_handle.write().unwrap();

if let Some(handle) = rafs_handle.thread.take() {
match handle.join() {
Expand Down Expand Up @@ -128,7 +130,7 @@ impl BootstrapArgs {
/// directory ends up as the root of the file system process. One way to accomplish this is via a
/// combination of mount namespaces and the pivot_root system call.
pub struct BlobFs {
bootstrap_args: BootstrapArgs,
state: BlobfsState,
pfs: PassthroughFs,
}

Expand All @@ -140,7 +142,7 @@ impl BlobFs {

Ok(BlobFs {
pfs,
bootstrap_args,
state: bootstrap_args,
})
}

Expand All @@ -163,7 +165,7 @@ impl BlobFs {
Ok(())
}

fn load_bootstrap(cfg: &Config) -> io::Result<BootstrapArgs> {
fn load_bootstrap(cfg: &Config) -> io::Result<BlobfsState> {
let blob_ondemand_conf = BlobOndemandConfig::from_str(&cfg.blob_ondemand_cfg)?;
if !blob_ondemand_conf.rafs_conf.validate() {
return Err(einval!("blobfs: invlidate configuration for blobfs"));
Expand Down Expand Up @@ -200,54 +202,55 @@ impl BlobFs {
thread: Some(rafs_join_handle),
};

Ok(BootstrapArgs {
rafs_handle: Mutex::new(rafs_handle),
Ok(BlobfsState {
blob_cache_dir: blob_ondemand_conf.blob_cache_dir.clone(),
rafs_handle: RwLock::new(rafs_handle),
inode_map: Mutex::new(HashMap::new()),
})
}

fn get_blob_id_and_size(&self, inode: Inode) -> io::Result<(String, u64)> {
// locate blob file that the inode refers to
let blob_id_full_path = self.pfs.readlinkat_proc_file(inode)?;
let parent = blob_id_full_path
.parent()
.ok_or_else(|| einval!("blobfs: failed to find parent"))?;

trace!(
"parent: {:?}, blob id path: {:?}",
parent,
blob_id_full_path
);

let blob_file = Self::open_file(
libc::AT_FDCWD,
blob_id_full_path.as_path(),
libc::O_PATH | libc::O_NOFOLLOW | libc::O_CLOEXEC,
0,
)
.map_err(|e| einval!(e))?;
let st = Self::stat(&blob_file).map_err(|e| {
error!("get_blob_id_and_size: stat failed {:?}", e);
e
})?;
if st.st_size < 0 {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, size: {:?} is less than 0",
blob_id_full_path.display(),
st.st_size
)));
}

let blob_id = blob_id_full_path
.file_name()
.ok_or_else(|| einval!("blobfs: failed to find blob file"))?;
let blob_id = blob_id
.to_os_string()
.into_string()
.map_err(|_e| einval!("blobfs: failed to get blob id from file name"))?;
trace!("load_chunks_on_demand: blob_id {}", blob_id);
let mut map = self.state.inode_map.lock().unwrap();
match map.entry(inode) {
std::collections::hash_map::Entry::Occupied(v) => {
let (sz, blob_id) = v.get();
Ok((blob_id.to_string(), *sz))
}
std::collections::hash_map::Entry::Vacant(entry) => {
// locate blob file that the inode refers to
let blob_id_full_path = self.pfs.readlinkat_proc_file(inode)?;
let blob_file = Self::open_file(
libc::AT_FDCWD,
blob_id_full_path.as_path(),
libc::O_PATH | libc::O_NOFOLLOW | libc::O_CLOEXEC,
0,
)
.map_err(|e| einval!(e))?;
let st = Self::stat(&blob_file).map_err(|e| {
error!("get_blob_id_and_size: stat failed {:?}", e);
e
})?;
if st.st_size < 0 {
return Err(einval!(format!(
"load_chunks_on_demand: blob_id {:?}, size: {:?} is less than 0",
blob_id_full_path.display(),
st.st_size
)));
}

Ok((blob_id, st.st_size as u64))
let blob_id = blob_id_full_path
.file_name()
.ok_or_else(|| einval!("blobfs: failed to find blob file"))?;
let blob_id = blob_id
.to_os_string()
.into_string()
.map_err(|_e| einval!("blobfs: failed to get blob id from file name"))?;
trace!("load_chunks_on_demand: blob_id {}", blob_id);
entry.insert((st.st_size as u64, blob_id.clone()));

Ok((blob_id, st.st_size as u64))
}
}
}

fn stat(f: &File) -> io::Result<libc::stat64> {
Expand Down
52 changes: 33 additions & 19 deletions rafs/src/blobfs/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ use fuse_backend_rs::api::filesystem::{
};
use fuse_backend_rs::transport::FsCacheReqHandler;
use nydus_error::eacces;
use nydus_utils::{round_down, round_up};

use super::*;
use crate::fs::Handle;
use crate::metadata::Inode;

const MAPPING_UNIT_SIZE: u64 = 0x200000;

impl BootstrapArgs {
impl BlobfsState {
fn fetch_range_sync(&self, prefetches: &[BlobPrefetchRequest]) -> io::Result<()> {
let rafs_handle = self.rafs_handle.lock().unwrap();
let rafs_handle = self.rafs_handle.read().unwrap();
match rafs_handle.rafs.as_ref() {
Some(rafs) => rafs.fetch_range_synchronous(prefetches),
None => Err(einval!("blobfs: failed to initialize RAFS filesystem.")),
Expand All @@ -35,24 +36,24 @@ impl BootstrapArgs {
impl BlobFs {
// prepare BlobPrefetchRequest and call device.prefetch().
// Make sure prefetch doesn't use delay_persist as we need the data immediately.
fn load_chunks_on_demand(&self, inode: Inode, offset: u64) -> io::Result<()> {
fn load_chunks_on_demand(&self, inode: Inode, offset: u64, len: u64) -> io::Result<()> {
let (blob_id, size) = self.get_blob_id_and_size(inode)?;
let offset = offset & !(MAPPING_UNIT_SIZE - 1);
if size <= offset {
if size <= offset || offset.checked_add(len).is_none() {
return Err(einval!(format!(
"blobfs: blob_id {:?}, offset {:?} is larger than size {:?}",
blob_id, offset, size
)));
}

let len = size - offset;
let end = std::cmp::min(offset + len, size);
let len = end - offset;
let req = BlobPrefetchRequest {
blob_id,
offset,
len: std::cmp::min(len, MAPPING_UNIT_SIZE), // 2M range
len,
};

self.bootstrap_args.fetch_range_sync(&[req]).map_err(|e| {
self.state.fetch_range_sync(&[req]).map_err(|e| {
warn!("blobfs: failed to load data, {:?}", e);
e
})
Expand All @@ -64,7 +65,7 @@ impl FileSystem for BlobFs {
type Handle = Handle;

fn init(&self, capable: FsOptions) -> io::Result<FsOptions> {
self.bootstrap_args.get_rafs_handle()?;
self.state.get_rafs_handle()?;
self.pfs.init(capable)
}

Expand Down Expand Up @@ -193,16 +194,18 @@ impl FileSystem for BlobFs {

fn read(
&self,
_ctx: &Context,
_inode: Inode,
_handle: Handle,
_w: &mut dyn ZeroCopyWriter,
_size: u32,
_offset: u64,
_lock_owner: Option<u64>,
_flags: u32,
ctx: &Context,
inode: Inode,
handle: Handle,
w: &mut dyn ZeroCopyWriter,
size: u32,
offset: u64,
lock_owner: Option<u64>,
flags: u32,
) -> io::Result<usize> {
Err(eacces!("Read request is not allowed in blobfs"))
self.load_chunks_on_demand(inode, offset, size as u64)?;
self.pfs
.read(ctx, inode, handle, w, size, offset, lock_owner, flags)
}

fn write(
Expand Down Expand Up @@ -376,7 +379,18 @@ impl FileSystem for BlobFs {
if (flags & virtio_fs::SetupmappingFlags::WRITE.bits()) != 0 {
return Err(eacces!("blob file cannot write in dax"));
}
self.load_chunks_on_demand(inode, foffset)?;
if foffset.checked_add(len).is_none() || foffset + len > u64::MAX - MAPPING_UNIT_SIZE {
return Err(einval!(format!(
"blobfs: blob_id {:?}, offset {:?} is larger than size {:?}",
blob_id, offset, size
)));
}

let end = round_up(foffset + len, MAPPING_UNIT_SIZE);
let offset = round_down(foffset, MAPPING_UNIT_SIZE);
let len = end - offset;
self.load_chunks_on_demand(inode, offset, len)?;

self.pfs
.setupmapping(_ctx, inode, _handle, foffset, len, flags, moffset, vu_req)
}
Expand Down