Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change store type dyn -> static #54

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
fn main() {
println!(
"cargo:rustc-env=GIT_VERSION={}",
git_version::git_version!(args = ["--tags", "--always", "--dirty=-modified"], fallback = "unknown")
git_version::git_version!(
args = ["--tags", "--always", "--dirty=-modified"],
fallback = "unknown"
)
);
}
2 changes: 1 addition & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::fungi::{
};
use crate::store::Store;

use anyhow::{ensure, Result, Context};
use anyhow::{ensure, Context, Result};
use polyfuse::reply::FileAttr;
use polyfuse::{
op,
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, store0);
store.add(0x80, 0xff, store1);

pack(writer, store, &source, false).await.unwrap();

Expand All @@ -72,8 +72,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, store0);
store.add(0x80, 0xff, store1);

let cache = Cache::new(root.join("cache"), store);

Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use clap::{ArgAction, Args, Parser, Subcommand};

use rfs::cache;
use rfs::fungi;
use rfs::store::{self, Router};
use rfs::store::{self, Router, Stores};

use regex::Regex;

Expand Down Expand Up @@ -230,7 +230,7 @@ async fn fuse(opts: MountOptions) -> Result<()> {
filesystem.mount(opts.target).await
}

async fn get_router(meta: &fungi::Reader) -> Result<Router> {
async fn get_router(meta: &fungi::Reader) -> Result<Router<Stores>> {
let mut router = store::Router::new();

for route in meta.routes().await.context("failed to get store routes")? {
Expand All @@ -243,7 +243,7 @@ async fn get_router(meta: &fungi::Reader) -> Result<Router> {
Ok(router)
}

async fn parse_router(urls: &[String]) -> Result<Router> {
async fn parse_router(urls: &[String]) -> Result<Router<Stores>> {
let mut router = Router::new();
let pattern = r"^(?P<range>[0-9a-f]{2}-[0-9a-f]{2})=(?P<url>.+)$";
let re = Regex::new(pattern)?;
Expand Down
25 changes: 10 additions & 15 deletions src/store/dir.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use std::io::ErrorKind;
use std::os::unix::prelude::OsStrExt;
use std::path::PathBuf;
use tokio::fs;
use url;

const SCHEME: &str = "dir";

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let u = url::Url::parse(&url)?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(Box::new(DirStore::new(u.path()).await?))
}

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
pub const SCHEME: &str = "dir";

/// DirStore is a simple store that store blobs on the filesystem
/// and is mainly used for testing
Expand All @@ -29,6 +16,14 @@ pub struct DirStore {
}

impl DirStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<DirStore> {
let u = url::Url::parse(url.as_ref())?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(DirStore::new(u.path()).await?)
}
pub async fn new<P: Into<PathBuf>>(root: P) -> Result<Self> {
let root = root.into();
fs::create_dir_all(&root).await?;
Expand Down
96 changes: 46 additions & 50 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,32 @@ pub mod s3store;
pub mod zdb;

use rand::seq::SliceRandom;
use std::{collections::HashMap, pin::Pin};

pub use bs::BlockStore;
use futures::Future;

lazy_static::lazy_static! {
static ref STORES: HashMap<String, Factory> = register_stores();
}

/// register_stores is used to register the stores built in types
/// so they can be created with a url
fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);
m.insert("s3".into(), s3store::make);
m.insert("s3s".into(), s3store::make);
m.insert("s3s+tls".into(), s3store::make);

m
}
pub use self::router::Router;

pub async fn make<U: AsRef<str>>(u: U) -> Result<Box<dyn Store>> {
pub async fn make<U: AsRef<str>>(u: U) -> Result<Stores> {
let parsed = url::Url::parse(u.as_ref())?;
let factory = match STORES.get(parsed.scheme()) {
None => return Err(Error::UnknownStore(parsed.scheme().into())),
Some(factory) => factory,
};

factory(u.as_ref()).await
match parsed.scheme() {
dir::SCHEME => return Ok(Stores::Dir(
dir::DirStore::make(&u)
.await
.expect("failed to make dir store"),
)),
"s3" | "s3s" | "s3s+tls" => return Ok(Stores::S3(
s3store::S3Store::make(&u)
.await
.expect(format!("failed to make {} store", parsed.scheme()).as_str()),
)),
"zdb" => return Ok(Stores::ZDB(
zdb::ZdbStore::make(&u)
.await
.expect("failed to make zdb store"),
)),
_ => return Err(Error::UnknownStore(parsed.scheme().into())),
}
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -101,28 +97,11 @@ pub trait Store: Send + Sync + 'static {
fn routes(&self) -> Vec<Route>;
}

/// The store factory works as a factory for a specific store
/// this is only needed to be able dynamically create different types
/// of stores based only on scheme of the store url.
/// the Factory returns a factory future that resolved to a Box<dyn Store>
pub type Factory = fn(u: &str) -> FactoryFuture;

/// FactoryFuture is a future that resolves to a Result<Box<dyn Store>> this
/// is returned by a factory function like above
pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;

/// Router holds a set of shards (stores) where each store can be configured to serve
/// a range of hashes.
///
/// On get, all possible stores that is configured to serve this key are tried until the first
/// one succeed
///
/// On set, the router set the object on all matching stores, and fails if at least
/// one store fails, or if no store matches the key
pub type Router = router::Router<Box<dyn Store>>;

#[async_trait::async_trait]
impl Store for Router {
impl<S> Store for Router<S>
where
S: Store,
{
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
if key.is_empty() {
return Err(Error::InvalidKey);
Expand All @@ -131,7 +110,7 @@ impl Store for Router {

// to make it fare we shuffle the list of matching routers randomly everytime
// before we do a get
let mut routers: Vec<&Box<dyn Store>> = self.route(key[0]).collect();
let mut routers: Vec<&S> = self.route(key[0]).collect();
routers.shuffle(&mut rand::thread_rng());
for store in routers {
match store.get(key).await {
Expand Down Expand Up @@ -182,16 +161,33 @@ impl Store for Router {
routes
}
}
pub enum Stores {
S3(s3store::S3Store),
Dir(dir::DirStore),
ZDB(zdb::ZdbStore),
}

#[async_trait::async_trait]
impl Store for Box<dyn Store> {
impl Store for Stores {
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
self.as_ref().get(key).await
match self {
self::Stores::S3(s3_store) => s3_store.get(key).await,
self::Stores::Dir(dir_store) => dir_store.get(key).await,
self::Stores::ZDB(zdb_store) => zdb_store.get(key).await,
}
}
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
self.as_ref().set(key, blob).await
match self {
self::Stores::S3(s3_store) => s3_store.set(key, blob).await,
self::Stores::Dir(dir_store) => dir_store.set(key, blob).await,
self::Stores::ZDB(zdb_store) => zdb_store.set(key, blob).await,
}
}
fn routes(&self) -> Vec<Route> {
self.as_ref().routes()
match self {
self::Stores::S3(s3_store) => s3_store.routes(),
self::Stores::Dir(dir_store) => dir_store.routes(),
self::Stores::ZDB(zdb_store) => zdb_store.routes(),
}
}
}
17 changes: 5 additions & 12 deletions src/store/s3store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{Error, Result, Route, Store};

use anyhow::Context;
use futures::Future;
use s3::{creds::Credentials, error::S3Error, Bucket, Region};
use std::pin::Pin;
use url::Url;

fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
Expand Down Expand Up @@ -46,17 +44,8 @@ fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (cred, region, bucket_name) = get_config(&url)?;
Ok(Box::new(S3Store::new(&url, &bucket_name, region, cred)?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
struct S3Store {
pub struct S3Store {
bucket: Bucket,
url: String,
// this is only here as a work around for this bug https://github.com/durch/rust-s3/issues/337
Expand All @@ -67,6 +56,10 @@ struct S3Store {
}

impl S3Store {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<S3Store> {
let (cred, region, bucket_name) = get_config(url.as_ref())?;
Ok(S3Store::new(url.as_ref(), &bucket_name, region, cred)?)
}
pub fn new(url: &str, bucket_name: &str, region: Region, cred: Credentials) -> Result<Self> {
let bucket = Bucket::new(bucket_name, region, cred)
.context("failed instantiate bucket")?
Expand Down
57 changes: 28 additions & 29 deletions src/store/zdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use anyhow::Context;

use bb8_redis::{
Expand Down Expand Up @@ -77,42 +77,41 @@ fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<St
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (mut info, namespace) = get_connection_info(&url)?;

let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};
#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
}

log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);
impl ZdbStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<ZdbStore> {
let (mut info, namespace) = get_connection_info(url.as_ref())?;

let mgr =
RedisConnectionManager::new(info).context("failed to create redis connection manager")?;
let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;
log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);

Ok(Box::from(ZdbStore { url, pool }))
}
let mgr = RedisConnectionManager::new(info)
.context("failed to create redis connection manager")?;

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;

#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
Ok(ZdbStore {
url: url.as_ref().to_string(),
pool,
})
}
}

impl ZdbStore {}

#[async_trait::async_trait]
impl Store for ZdbStore {
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
Expand Down
Loading