Skip to content

Commit

Permalink
Separation of cache and store
Browse files Browse the repository at this point in the history
Also a lot of clean up and refactor
  • Loading branch information
muhamadazmy committed Sep 19, 2023
1 parent bb44194 commit 1ce3cad
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 184 deletions.
145 changes: 20 additions & 125 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,134 +1,35 @@
use crate::fungi::meta::Block;
use crate::store::Store;
use anyhow::{Context, Result};
use bb8_redis::redis::aio::Connection;

use bb8_redis::{
bb8::{CustomizeConnection, Pool},
redis::{cmd, AsyncCommands, ConnectionInfo as RedisConnectionInfo, RedisError},
RedisConnectionManager,
};
use std::fmt::Display;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};

trait Hex {
fn hex(&self) -> String;
}

impl Hex for &[u8] {
fn hex(&self) -> String {
self.iter()
.map(|x| -> String { format!("{:02x}", x) })
.collect()
}
}

#[derive(Debug)]
struct WithNamespace {
namespace: Option<String>,
password: Option<String>,
}

#[async_trait::async_trait]
impl CustomizeConnection<Connection, RedisError> for WithNamespace {
async fn on_acquire(&self, connection: &mut Connection) -> Result<(), RedisError> {
match self.namespace {
Some(ref ns) if ns != "default" => {
let mut c = cmd("SELECT");
let c = c.arg(ns);
if let Some(ref password) = self.password {
c.arg(password);
}

let result = c.query_async(connection).await;
if let Err(ref err) = result {
error!("failed to switch namespace to {}: {}", ns, err);
}
result
}
_ => Ok(()),
}
}
}

pub struct ConnectionInfo {
redis: RedisConnectionInfo,
namespace: Option<String>,
}

impl ConnectionInfo {
/// create a new instance of connection info
pub fn new(redis: RedisConnectionInfo, namespace: Option<String>) -> Self {
Self { redis, namespace }
}
}

impl Display for ConnectionInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "redis://{}", self.redis.addr)?;
if let Some(ref ns) = self.namespace {
write!(f, "/{}", ns)?;
}

if let Some(ref pw) = self.redis.redis.password {
write!(f, " [password: {}]", pw)?;
}

Ok(())
}
}

pub trait IntoConnectionInfo {
fn into_connection_info(self) -> Result<ConnectionInfo>;
}

impl IntoConnectionInfo for ConnectionInfo {
fn into_connection_info(self) -> Result<ConnectionInfo> {
Ok(self)
}
}
#[derive(Clone)]
pub struct Cache {
pool: Pool<RedisConnectionManager>,
pub struct Cache<S: Store> {
store: S,
root: PathBuf,
}

impl Cache {
pub async fn new<S, P>(info: S, root: P) -> Result<Cache>
impl<S> Cache<S>
where
S: Store,
{
pub fn new<P>(root: P, store: S) -> Self
where
S: IntoConnectionInfo,
P: Into<PathBuf>,
{
let info: ConnectionInfo = info.into_connection_info()?;
let namespace = WithNamespace {
namespace: info.namespace,
password: info.redis.redis.password.clone(),
};
log::debug!("switching namespace to: {:?}", namespace.namespace);
let mgr = RedisConnectionManager::new(info.redis)?;

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await?;

Ok(Cache {
pool,
Cache {
store,
root: root.into(),
})
}
}

// get content from redis
async fn get_data(&self, id: &[u8], key: &[u8]) -> Result<Vec<u8>> {
let mut con = self.pool.get().await.context("failed to get connection")?;
//con.
let result: Vec<u8> = con.get(id).await?;
if result.is_empty() {
anyhow::bail!("invalid chunk length downloaded");
}
let result = self.store.get(id).await?;

let key = unsafe { std::str::from_utf8_unchecked(key) };
let mut decoder = snap::raw::Decoder::new();
Expand Down Expand Up @@ -243,20 +144,14 @@ impl Locker {
}
}

impl<S: AsRef<str>> IntoConnectionInfo for S {
fn into_connection_info(self) -> Result<ConnectionInfo> {
let mut u: url::Url = self.as_ref().parse().context("failed to parse url")?;
let namespace: Option<String> = match u.path_segments() {
None => None,
Some(mut segments) => segments.next().map(|s| s.to_owned()),
};

u.set_path("");
trait Hex {
fn hex(&self) -> String;
}

use bb8_redis::redis::IntoConnectionInfo as RedisIntoConnectionInfo;
Ok(ConnectionInfo {
redis: RedisIntoConnectionInfo::into_connection_info(u)?,
namespace,
})
impl Hex for &[u8] {
fn hex(&self) -> String {
self.iter()
.map(|x| -> String { format!("{:02x}", x) })
.collect()
}
}
19 changes: 13 additions & 6 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::fungi::{
meta::{FileType, Inode},
Reader,
};
use crate::store::Store;

use anyhow::{ensure, Result};
use polyfuse::reply::FileAttr;
Expand All @@ -31,24 +32,30 @@ type FHash = [u8; 16];
type BlockSize = u64;

#[derive(Clone)]
pub struct Filesystem {
pub struct Filesystem<S>
where
S: Store + Clone,
{
meta: Reader,
cache: cache::Cache,
cache: cache::Cache<S>,
lru: Arc<Mutex<lru::LruCache<FHash, (File, BlockSize)>>>,
}

impl Filesystem {
pub fn new(meta: Reader, cache: cache::Cache) -> Filesystem {
impl<S> Filesystem<S>
where
S: Store + Clone,
{
pub fn new(meta: Reader, cache: cache::Cache<S>) -> Self {
Filesystem {
meta,
cache,
lru: Arc::new(Mutex::new(lru::LruCache::new(LRU_CAP))),
}
}

pub async fn mount<S>(&self, mnt: S) -> Result<()>
pub async fn mount<P>(&self, mnt: P) -> Result<()>
where
S: Into<PathBuf>,
P: Into<PathBuf>,
{
let mountpoint: PathBuf = mnt.into();
ensure!(mountpoint.is_dir(), "mountpoint must be a directory");
Expand Down
98 changes: 97 additions & 1 deletion src/fungi/meta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::{Path, PathBuf};

use sqlx::{sqlite::SqliteRow, FromRow, Row, SqlitePool};

Expand Down Expand Up @@ -49,6 +49,9 @@ pub enum Error {

#[error("io error: {0}")]
IO(#[from] std::io::Error),

#[error("unknown error: {0}")]
Anyhow(#[from] anyhow::Error),
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -181,6 +184,16 @@ impl<'a> Tag<'a> {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Walk {
Continue,
Break,
}
#[async_trait::async_trait]
pub trait WalkVisitor {
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk>;
}

#[derive(Clone)]
pub struct Reader {
pool: SqlitePool,
Expand Down Expand Up @@ -253,6 +266,50 @@ impl Reader {

Ok(results)
}

pub async fn walk<W: WalkVisitor + Send>(&self, visitor: &mut W) -> Result<()> {
let node = self.inode(1).await?;
let path: PathBuf = "".into();
self.walk_node(&path, &node, visitor).await?;
Ok(())
}

#[async_recursion::async_recursion]
async fn walk_node<W: WalkVisitor + Send>(
&self,
path: &Path,
node: &Inode,
visitor: &mut W,
) -> Result<Walk> {
let path = path.join(&node.name);
if visitor.visit(&path, node).await? == Walk::Break {
return Ok(Walk::Break);
}

let mut offset = 0;
loop {
let children = self.children(node.ino, 1000, offset).await?;
if children.is_empty() {
break;
}

for child in children {
offset += 1;
if self.walk_node(&path, &child, visitor).await? == Walk::Break {
// if a file return break, we stop scanning this directory
if child.mode.is(FileType::Regular) {
return Ok(Walk::Continue);
}
// if child was a directory we continue because it means
// a directory returned a break on first visit so the
// entire directory is skipped anyway
continue;
}
}
}

Ok(Walk::Continue)
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -473,4 +530,43 @@ mod test {
assert_eq!(m.permissions(), 0754);
assert_eq!(m.file_type(), FileType::Regular);
}

#[tokio::test]
async fn test_walk() {
const PATH: &str = "/tmp/walk.fl";
let meta = Writer::new(PATH).await.unwrap();

let parent = meta
.inode(Inode {
name: "/".into(),
data: Some("target".into()),
..Inode::default()
})
.await
.unwrap();

for name in ["bin", "etc", "usr"] {
meta.inode(Inode {
parent: parent,
name: name.into(),
..Inode::default()
})
.await
.unwrap();
}

let meta = Reader::new(PATH).await.unwrap();
//TODO: validate the walk
meta.walk(&mut WalkTest).await.unwrap();
}

struct WalkTest;

#[async_trait::async_trait]
impl WalkVisitor for WalkTest {
async fn visit(&mut self, path: &Path, node: &Inode) -> Result<Walk> {
println!("{} = {:?}", node.ino, path);
Ok(Walk::Continue)
}
}
}
Loading

0 comments on commit 1ce3cad

Please sign in to comment.