Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change store type dyn -> static #54

Merged
merged 3 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
fn main() {
println!(
"cargo:rustc-env=GIT_VERSION={}",
git_version::git_version!(args = ["--tags", "--always", "--dirty=-modified"], fallback = "unknown")
git_version::git_version!(
args = ["--tags", "--always", "--dirty=-modified"],
fallback = "unknown"
)
);
}
2 changes: 1 addition & 1 deletion src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::fungi::{
};
use crate::store::Store;

use anyhow::{ensure, Result, Context};
use anyhow::{ensure, Context, Result};
use polyfuse::reply::FileAttr;
use polyfuse::{
op,
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod test {
use crate::{
cache::Cache,
fungi::meta,
store::{dir::DirStore, Router},
store::{dir::DirStore, Router, Stores},
};
use std::path::PathBuf;
use tokio::{fs, io::AsyncReadExt};
Expand Down Expand Up @@ -61,8 +61,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, Stores::Dir(store0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general any Store should work in place of Stores since they both implement Store trait. I did some changes to fix this and will push it

store.add(0x80, 0xff, Stores::Dir(store1));

pack(writer, store, &source, false).await.unwrap();

Expand All @@ -72,8 +72,8 @@ mod test {
let store1 = DirStore::new(root.join("store1")).await.unwrap();
let mut store = Router::new();

store.add(0x00, 0x7f, Box::new(store0));
store.add(0x80, 0xff, Box::new(store1));
store.add(0x00, 0x7f, Stores::Dir(store0));
store.add(0x80, 0xff, Stores::Dir(store1));

let cache = Cache::new(root.join("cache"), store);

Expand Down
25 changes: 10 additions & 15 deletions src/store/dir.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use std::io::ErrorKind;
use std::os::unix::prelude::OsStrExt;
use std::path::PathBuf;
use tokio::fs;
use url;

const SCHEME: &str = "dir";

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let u = url::Url::parse(&url)?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(Box::new(DirStore::new(u.path()).await?))
}

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
pub const SCHEME: &str = "dir";

/// DirStore is a simple store that store blobs on the filesystem
/// and is mainly used for testing
Expand All @@ -29,6 +16,14 @@ pub struct DirStore {
}

impl DirStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<DirStore> {
let u = url::Url::parse(url.as_ref())?;
if u.scheme() != SCHEME {
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
}

Ok(DirStore::new(u.path()).await?)
}
pub async fn new<P: Into<PathBuf>>(root: P) -> Result<Self> {
let root = root.into();
fs::create_dir_all(&root).await?;
Expand Down
88 changes: 47 additions & 41 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,35 @@ pub mod s3store;
pub mod zdb;

use rand::seq::SliceRandom;
use std::{collections::HashMap, pin::Pin};

pub use bs::BlockStore;
use futures::Future;

lazy_static::lazy_static! {
static ref STORES: HashMap<String, Factory> = register_stores();
}

/// register_stores is used to register the stores built in types
/// so they can be created with a url
fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);
m.insert("s3".into(), s3store::make);
m.insert("s3s".into(), s3store::make);
m.insert("s3s+tls".into(), s3store::make);

m
}

pub async fn make<U: AsRef<str>>(u: U) -> Result<Box<dyn Store>> {
pub async fn make<U: AsRef<str>>(u: U) -> Result<Stores> {
let parsed = url::Url::parse(u.as_ref())?;
let factory = match STORES.get(parsed.scheme()) {
None => return Err(Error::UnknownStore(parsed.scheme().into())),
Some(factory) => factory,
};

factory(u.as_ref()).await
if parsed.scheme() == dir::SCHEME {
return Ok(Stores::Dir(
dir::DirStore::make(&u)
.await
.expect("failed to make dir store"),
));
}
if parsed.scheme() == "s3" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should also be done for s3s and s3s+tls as per original map

i think also you can use a match instead of multiple if statements

return Ok(Stores::S3(
s3store::S3Store::make(&u)
.await
.expect("failed to make s3 store"),
));
}
if parsed.scheme() == "zdb" {
return Ok(Stores::ZDB(
zdb::ZdbStore::make(&u)
.await
.expect("failed to make zdb store"),
));
}

Err(Error::UnknownStore(parsed.scheme().into()))
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -101,16 +100,6 @@ pub trait Store: Send + Sync + 'static {
fn routes(&self) -> Vec<Route>;
}

/// The store factory works as a factory for a specific store
/// this is only needed to be able dynamically create different types
/// of stores based only on scheme of the store url.
/// the Factory returns a factory future that resolved to a Box<dyn Store>
pub type Factory = fn(u: &str) -> FactoryFuture;

/// FactoryFuture is a future that resolves to a Result<Box<dyn Store>> this
/// is returned by a factory function like above
pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;

/// Router holds a set of shards (stores) where each store can be configured to serve
/// a range of hashes.
///
Expand All @@ -119,7 +108,7 @@ pub type FactoryFuture = Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>>;
///
/// On set, the router set the object on all matching stores, and fails if at least
/// one store fails, or if no store matches the key
pub type Router = router::Router<Box<dyn Store>>;
pub type Router = router::Router<Stores>;

#[async_trait::async_trait]
impl Store for Router {
Expand All @@ -131,7 +120,7 @@ impl Store for Router {

// to make it fare we shuffle the list of matching routers randomly everytime
// before we do a get
let mut routers: Vec<&Box<dyn Store>> = self.route(key[0]).collect();
let mut routers: Vec<&Stores> = self.route(key[0]).collect();
routers.shuffle(&mut rand::thread_rng());
for store in routers {
match store.get(key).await {
Expand Down Expand Up @@ -182,16 +171,33 @@ impl Store for Router {
routes
}
}
pub enum Stores {
S3(s3store::S3Store),
Dir(dir::DirStore),
ZDB(zdb::ZdbStore),
}

#[async_trait::async_trait]
impl Store for Box<dyn Store> {
impl Store for Stores {
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
self.as_ref().get(key).await
match self {
self::Stores::S3(s3_store) => s3_store.get(key).await,
self::Stores::Dir(dir_store) => dir_store.get(key).await,
self::Stores::ZDB(zdb_store) => zdb_store.get(key).await,
}
}
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
self.as_ref().set(key, blob).await
match self {
self::Stores::S3(s3_store) => s3_store.set(key, blob).await,
self::Stores::Dir(dir_store) => dir_store.set(key, blob).await,
self::Stores::ZDB(zdb_store) => zdb_store.set(key, blob).await,
}
}
fn routes(&self) -> Vec<Route> {
self.as_ref().routes()
match self {
self::Stores::S3(s3_store) => s3_store.routes(),
self::Stores::Dir(dir_store) => dir_store.routes(),
self::Stores::ZDB(zdb_store) => zdb_store.routes(),
}
}
}
17 changes: 5 additions & 12 deletions src/store/s3store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{Error, Result, Route, Store};

use anyhow::Context;
use futures::Future;
use s3::{creds::Credentials, error::S3Error, Bucket, Region};
use std::pin::Pin;
use url::Url;

fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
Expand Down Expand Up @@ -46,17 +44,8 @@ fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (cred, region, bucket_name) = get_config(&url)?;
Ok(Box::new(S3Store::new(&url, &bucket_name, region, cred)?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
struct S3Store {
pub struct S3Store {
bucket: Bucket,
url: String,
// this is only here as a work around for this bug https://github.com/durch/rust-s3/issues/337
Expand All @@ -67,6 +56,10 @@ struct S3Store {
}

impl S3Store {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<S3Store> {
let (cred, region, bucket_name) = get_config(url.as_ref())?;
Ok(S3Store::new(url.as_ref(), &bucket_name, region, cred)?)
}
pub fn new(url: &str, bucket_name: &str, region: Region, cred: Credentials) -> Result<Self> {
let bucket = Bucket::new(bucket_name, region, cred)
.context("failed instantiate bucket")?
Expand Down
57 changes: 28 additions & 29 deletions src/store/zdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Error, FactoryFuture, Result, Route, Store};
use super::{Error, Result, Route, Store};
use anyhow::Context;

use bb8_redis::{
Expand Down Expand Up @@ -77,42 +77,41 @@ fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<St
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (mut info, namespace) = get_connection_info(&url)?;

let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};
#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
}

log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);
impl ZdbStore {
pub async fn make<U: AsRef<str>>(url: &U) -> Result<ZdbStore> {
let (mut info, namespace) = get_connection_info(url.as_ref())?;

let mgr =
RedisConnectionManager::new(info).context("failed to create redis connection manager")?;
let namespace = WithNamespace {
namespace,
password: info.redis.password.take(),
};

let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;
log::debug!("connection {:#?}", info);
log::debug!("switching namespace to: {:?}", namespace.namespace);

Ok(Box::from(ZdbStore { url, pool }))
}
let mgr = RedisConnectionManager::new(info)
.context("failed to create redis connection manager")?;

pub fn make(url: &str) -> FactoryFuture {
Box::pin(make_inner(url.into()))
}
let pool = Pool::builder()
.max_size(20)
.connection_customizer(Box::new(namespace))
.build(mgr)
.await
.context("failed to create connection pool")?;

#[derive(Clone)]
pub struct ZdbStore {
url: String,
pool: Pool<RedisConnectionManager>,
Ok(ZdbStore {
url: url.as_ref().to_string(),
pool,
})
}
}

impl ZdbStore {}

#[async_trait::async_trait]
impl Store for ZdbStore {
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
Expand Down
Loading