Skip to content

Commit

Permalink
wip: refactor s3store to implement store and register it
Browse files Browse the repository at this point in the history
  • Loading branch information
Omarabdul3ziz committed Sep 27, 2023
1 parent b74ebd8 commit acdadc9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 31 deletions.
5 changes: 5 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod bs;
pub mod dir;
mod router;
pub mod zdb;
pub mod s3;

use rand::seq::SliceRandom;
use std::{collections::HashMap, pin::Pin};
Expand All @@ -19,6 +20,7 @@ fn register_stores() -> HashMap<String, Factory> {
let mut m: HashMap<String, Factory> = HashMap::default();
m.insert("dir".into(), dir::make);
m.insert("zdb".into(), zdb::make);
m.insert("s3".into(), s3::make);

m
}
Expand Down Expand Up @@ -52,6 +54,9 @@ pub enum Error {
#[error("encryption error")]
EncryptionError,

#[error("bucket creation error")]
BucketCreationError,

// TODO: better display for the Box<Vec<Self>>
#[error("multiple error: {0:?}")]
Multiple(Box<Vec<Self>>),
Expand Down
94 changes: 63 additions & 31 deletions src/store/s3.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,49 @@
use anyhow::{anyhow, Context, Result};
use rusoto_core::{Region, RusotoError};
use super::{Error, Result, Route, Store};
use anyhow::Context;
use futures::Future;
use std::pin::Pin;

use rusoto_core::{ByteStream, Region, RusotoError};
use rusoto_s3::{
CreateBucketError, CreateBucketRequest, GetObjectRequest, PutObjectRequest, S3Client, S3,
};
use tokio::io::AsyncReadExt;

fn get_config() -> Result<(String, String, Credentials)> {
// TODO: get these from .env?
Ok((
String::from(""),
String::from(""),
Credentials {
access_key: String::from(""),
secret_key: String::from(""),
},
))
}

async fn make_inner(url: String) -> Result<Box<dyn Store>> {
let (region, bucket, cred) = get_config()?;
// TODO: move creating the bucket here
Ok(Box::new(S3Store::new(&url, &region, &bucket, cred).await?))
}

pub fn make(url: &str) -> Pin<Box<dyn Future<Output = Result<Box<dyn Store>>>>> {
Box::pin(make_inner(url.into()))
}

#[derive(Clone)]
struct BucketManager {
struct S3Store {
client: S3Client,
bucket: String,
endpoint: String,
}

struct Credentials {
access_key: String,
secret_key: String,
}

impl BucketManager {
impl S3Store {
pub async fn new(
endpoint: &str,
region: &str,
Expand All @@ -29,7 +56,7 @@ impl BucketManager {
};

let dispatcher =
rusoto_core::request::HttpClient::new().context("Error creating http client.")?;
rusoto_core::request::HttpClient::new().context("failed to create http client.")?;

let provider = rusoto_core::credential::StaticProvider::new_minimal(
cred.access_key.clone(),
Expand All @@ -48,50 +75,55 @@ impl BucketManager {
Ok(Self {
client,
bucket: bucket.to_owned(),
endpoint: endpoint.to_owned(),
})
}
Err(err) => Err(err).context("Error creating bucket"),
Err(_) => return Err(Error::BucketCreationError),
}
}
}

async fn set(&self, key: &str, data: &[u8]) -> Result<()> {
let put_object_request = PutObjectRequest {
bucket: self.bucket.clone(),
key: key.to_owned(),
body: Some(data.to_vec().into()),
..Default::default()
};
self.client
.put_object(put_object_request)
.await
.context("Error uploading")?;
Ok(())
}

async fn get(&self, key: &str) -> Result<Vec<u8>> {
#[async_trait::async_trait]
impl Store for S3Store {
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
let get_object_request = GetObjectRequest {
bucket: self.bucket.clone(),
key: key.to_owned(),
key: hex::encode(key),
..Default::default()
};

let res = self
.client
.get_object(get_object_request)
.await
.context("Error retrieving data")?;
.context("failed to get blob")?;

// ensure body in not none
let body = res
.body
.ok_or_else(|| anyhow!("No data found in S3 object"))?;
let body = res.body.ok_or(Error::KeyNotFound)?;

let mut buffer = Vec::new();
body.into_async_read()
.read_to_end(&mut buffer)
.await
.context("Error reading data")?;
if let Err(_) = body.into_async_read().read_to_end(&mut buffer).await {
return Err(Error::InvalidBlob);
}

Ok(buffer)
}

async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
let put_object_request = PutObjectRequest {
bucket: self.bucket.clone(),
key: hex::encode(key),
body: Some(ByteStream::from(blob.to_owned())),
..Default::default()
};
self.client
.put_object(put_object_request)
.await
.context("failed to set blob")?;

Ok(())
}

fn routes(&self) -> Vec<Route> {
vec![Route::url(self.endpoint.clone())]
}
}

0 comments on commit acdadc9

Please sign in to comment.