Skip to content

Commit

Permalink
change store type dyn -> static
Browse files Browse the repository at this point in the history
  • Loading branch information
rawdaGastan committed May 19, 2024
1 parent a2aaa68 commit d5af1a2
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 104 deletions.
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
fn main() {
println!(
"cargo:rustc-env=GIT_VERSION={}",
git_version::git_version!(args = ["--tags", "--always", "--dirty=-modified"], fallback = "unknown")
git_version::git_version!(
args = ["--tags", "--always", "--dirty=-modified"],
fallback = "unknown"
)
);
}
2 changes: 1 addition & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::fungi::{
};
use crate::store::Store;

use anyhow::{ensure, Result, Context};
use anyhow::{ensure, Context, Result};
use polyfuse::reply::FileAttr;
use polyfuse::{
op,
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod test {
use crate::{
cache::Cache,
fungi::meta,
store::{dir::DirStore, Router},
store::{dir::DirStore, Router, Stores},
};
use std::path::PathBuf;
use tokio::{fs, io::AsyncReadExt};
Expand Down Expand Up @@ -61,8 +61,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, Stores::Dir(store0));
store.add(0x80, 0xff, Stores::Dir(store1));

pack(writer, store, &source, false).await.unwrap();

Expand All @@ -72,8 +72,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, Stores::Dir(store0));
store.add(0x80, 0xff, Stores::Dir(store1));

let cache = Cache::new(root.join("cache"), store);

Expand Down
25 changes: 10 additions & 15 deletions src/store/dir.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use std::io::ErrorKind;
use std::os::unix::prelude::OsStrExt;
use std::path::PathBuf;
use tokio::fs;
use url;

const SCHEME: &str = "dir";

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let u = url::Url::parse(&url)?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(Box::new(DirStore::new(u.path()).await?))
}

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
pub const SCHEME: &str = "dir";

/// DirStore is a simple store that store blobs on the filesystem
/// and is mainly used for testing
Expand All @@ -29,6 +16,14 @@ pub struct DirStore {
}

impl DirStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<DirStore> {
let u = url::Url::parse(url.as_ref())?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(DirStore::new(u.path()).await?)
}
pub async fn new<P: Into<PathBuf>>(root: P) -> Result<Self> {
let root = root.into();
fs::create_dir_all(&root).await?;
Expand Down
88 changes: 47 additions & 41 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,35 @@ pub mod s3store;
pub mod zdb;

use rand::seq::SliceRandom;
use std::{collections::HashMap, pin::Pin};

pub use bs::BlockStore;
use futures::Future;

lazy_static::lazy_static! {
static ref STORES: HashMap<String, Factory> = register_stores();
}

/// register_stores is used to register the stores built in types
/// so they can be created with a url
fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);
m.insert("s3".into(), s3store::make);
m.insert("s3s".into(), s3store::make);
m.insert("s3s+tls".into(), s3store::make);

m
}

pub async fn make<U: AsRef<str>>(u: U) -> Result<Box<dyn Store>> {
pub async fn make<U: AsRef<str>>(u: U) -> Result<Stores> {
let parsed = url::Url::parse(u.as_ref())?;
let factory = match STORES.get(parsed.scheme()) {
None => return Err(Error::UnknownStore(parsed.scheme().into())),
Some(factory) => factory,
};

factory(u.as_ref()).await
if parsed.scheme() == dir::SCHEME {
return Ok(Stores::Dir(
dir::DirStore::make(&u)
.await
.expect("failed to make dir store"),
));
}
if parsed.scheme() == "s3" {
return Ok(Stores::S3(
s3store::S3Store::make(&u)
.await
.expect("failed to make s3 store"),
));
}
if parsed.scheme() == "zdb" {
return Ok(Stores::ZDB(
zdb::ZdbStore::make(&u)
.await
.expect("failed to make zdb store"),
));
}

Err(Error::UnknownStore(parsed.scheme().into()))
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -101,16 +100,6 @@ pub trait Store: Send + Sync + 'static {
fn routes(&self) -> Vec<Route>;
}

/// The store factory works as a factory for a specific store
/// this is only needed to be able dynamically create different types
/// of stores based only on scheme of the store url.
/// the Factory returns a factory future that resolved to a Box<dyn Store>
pub type Factory = fn(u: &str) -> FactoryFuture;

/// FactoryFuture is a future that resolves to a Result<Box<dyn Store>> this
/// is returned by a factory function like above
pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;

/// Router holds a set of shards (stores) where each store can be configured to serve
/// a range of hashes.
///
Expand All @@ -119,7 +108,7 @@ pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;
///
/// On set, the router set the object on all matching stores, and fails if at least
/// one store fails, or if no store matches the key
pub type Router = router::Router<Box<dyn Store>>;
pub type Router = router::Router<Stores>;

#[async_trait::async_trait]
impl Store for Router {
Expand All @@ -131,7 +120,7 @@ impl Store for Router {

// to make it fare we shuffle the list of matching routers randomly everytime
// before we do a get
let mut routers: Vec<&Box<dyn Store>> = self.route(key[0]).collect();
let mut routers: Vec<&Stores> = self.route(key[0]).collect();
routers.shuffle(&mut rand::thread_rng());
for store in routers {
match store.get(key).await {
Expand Down Expand Up @@ -182,16 +171,33 @@ impl Store for Router {
routes
}
}
pub enum Stores {
S3(s3store::S3Store),
Dir(dir::DirStore),
ZDB(zdb::ZdbStore),
}

#[async_trait::async_trait]
impl Store for Box<dyn Store> {
impl Store for Stores {
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
self.as_ref().get(key).await
match self {
self::Stores::S3(s3_store) => s3_store.get(key).await,
self::Stores::Dir(dir_store) => dir_store.get(key).await,
self::Stores::ZDB(zdb_store) => zdb_store.get(key).await,
}
}
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
self.as_ref().set(key, blob).await
match self {
self::Stores::S3(s3_store) => s3_store.set(key, blob).await,
self::Stores::Dir(dir_store) => dir_store.set(key, blob).await,
self::Stores::ZDB(zdb_store) => zdb_store.set(key, blob).await,
}
}
fn routes(&self) -> Vec<Route> {
self.as_ref().routes()
match self {
self::Stores::S3(s3_store) => s3_store.routes(),
self::Stores::Dir(dir_store) => dir_store.routes(),
self::Stores::ZDB(zdb_store) => zdb_store.routes(),
}
}
}
17 changes: 5 additions & 12 deletions src/store/s3store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{Error, Result, Route, Store};

use anyhow::Context;
use futures::Future;
use s3::{creds::Credentials, error::S3Error, Bucket, Region};
use std::pin::Pin;
use url::Url;

fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
Expand Down Expand Up @@ -46,17 +44,8 @@ fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (cred, region, bucket_name) = get_config(&url)?;
Ok(Box::new(S3Store::new(&url, &bucket_name, region, cred)?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
struct S3Store {
pub struct S3Store {
bucket: Bucket,
url: String,
// this is only here as a work around for this bug https://github.com/durch/rust-s3/issues/337
Expand All @@ -67,6 +56,10 @@ struct S3Store {
}

impl S3Store {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<S3Store> {
let (cred, region, bucket_name) = get_config(url.as_ref())?;
Ok(S3Store::new(url.as_ref(), &bucket_name, region, cred)?)
}
pub fn new(url: &str, bucket_name: &str, region: Region, cred: Credentials) -> Result<Self> {
let bucket = Bucket::new(bucket_name, region, cred)
.context("failed instantiate bucket")?
Expand Down
57 changes: 28 additions & 29 deletions src/store/zdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use anyhow::Context;

use bb8_redis::{
Expand Down Expand Up @@ -77,42 +77,41 @@ fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<St
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (mut info, namespace) = get_connection_info(&url)?;

let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};
#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
}

log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);
impl ZdbStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<ZdbStore> {
let (mut info, namespace) = get_connection_info(url.as_ref())?;

let mgr =
RedisConnectionManager::new(info).context("failed to create redis connection manager")?;
let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;
log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);

Ok(Box::from(ZdbStore { url, pool }))
}
let mgr = RedisConnectionManager::new(info)
.context("failed to create redis connection manager")?;

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;

#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
Ok(ZdbStore {
url: url.as_ref().to_string(),
pool,
})
}
}

impl ZdbStore {}

#[async_trait::async_trait]
impl Store for ZdbStore {
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
Expand Down

0 comments on commit d5af1a2

Please sign in to comment.