Skip to content

Commit

Permalink
📝 Try to improve the implementation on cloudflare side.
Browse files Browse the repository at this point in the history
  • Loading branch information
langyo committed Aug 16, 2024
1 parent 585567b commit 9bfa84a
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 9 deletions.
1 change: 1 addition & 0 deletions packages/database_driver_cloudflare/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ log = "^0.4"
serde = { version = "^1", features = ["derive"] }
serde_json = { version = "^1" }
strum = { version = "^0.26", features = ["derive"] }
postcard = { version = "^1", features = ["alloc"] }
uuid = { version = "^1", features = [
'v4',
'fast-rng',
Expand Down
195 changes: 186 additions & 9 deletions packages/database_driver_cloudflare/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
use bytes::Bytes;
use chrono::DateTime;
use std::sync::Arc;
use uuid::Uuid;

use worker::{send::SendFuture, Env};

Expand Down Expand Up @@ -56,35 +57,211 @@ impl BucketStore for ProxyBucket {
async fn delete(&self, key: String) -> Result<()> {
let env = self.env.bucket(self.bucket_name.as_str())?;

SendFuture::new(async move {
let _ = env
.delete(key.as_str())
let ret = SendFuture::new(async move {
env.delete(key.as_str())
.await
.map_err(|err| anyhow!("Failed to delete key-value pair: {:?}", err));
.map_err(|err| anyhow!("Failed to delete key-value pair: {:?}", err))
})
.await;

Ok(())
ret
}

async fn create_multipart_upload(&self) -> Result<String> {
todo!()
let env = self.env.bucket(self.bucket_name.as_str())?;

let ret = SendFuture::new(async move {
let key = Uuid::new_v4().to_string();
match env.create_multipart_upload(key.clone()).execute().await {
Ok(info) => {
let upload_id = info.upload_id().await;
let upload_id = format!("{}_{}", key, upload_id);

let parts_metadata: Vec<String> = vec![];
let parts_metadata = postcard::to_allocvec(&parts_metadata)?;

self.set(
format!("__multi_{}", upload_id),
Bytes::from(parts_metadata),
)
.await?;
Ok(upload_id)
}
Err(err) => Err(anyhow!("Failed to create multipart upload: {:?}", err)),
}
})
.await;

ret
}

async fn append_multipart_upload(&self, upload_id: String, data: Bytes) -> Result<()> {
todo!()
let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;

let ret = SendFuture::new(async move {
let parts_metadata =
self.get(format!("__multi_{}", upload_id))
.await?
.ok_or(anyhow!(
"Failed to get part number for multipart upload: {:?}",
upload_id
))?;
let parts_metadata: Vec<String> = postcard::from_bytes(&parts_metadata)?;

match env.resume_multipart_upload(key, upload_id.clone()) {
Ok(uploader) => match uploader
.upload_part(
parts_metadata.len() as u16,
worker::Data::Bytes(data.to_vec()),
)
.await
{
Ok(info) => {
let mut parts_metadata = parts_metadata.clone();
parts_metadata.push(info.etag());
let parts_metadata = postcard::to_allocvec(&parts_metadata)?;

match self
.set(
format!("__multi_{}", upload_id),
Bytes::from(parts_metadata),
)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(
"Failed to set part number for multipart upload: {:?}",
err
)),
}
}
Err(err) => Err(anyhow!("Failed to append multipart upload: {:?}", err)),
},
Err(err) => Err(anyhow!("Failed to resume multipart upload: {:?}", err)),
}
})
.await;

ret
}

async fn complete_multipart_upload(
&self,
upload_id: String,
final_data_key: Option<String>,
) -> Result<BucketMultipartUploadResult> {
todo!()
if final_data_key.is_some() {
unimplemented!("final_data_key is not supported yet");
}

let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;

let ret = SendFuture::new(async move {
let parts_metadata =
self.get(format!("__multi_{}", upload_id))
.await?
.ok_or(anyhow!(
"Failed to get part number for multipart upload: {:?}",
upload_id
))?;
let parts_metadata: Vec<String> = postcard::from_bytes(&parts_metadata)?;

match env.resume_multipart_upload(key, upload_id.clone()) {
Ok(uploader) => match uploader
.complete(
parts_metadata
.iter()
.enumerate()
.map(|(index, item)| {
worker::UploadedPart::new(index as u16, item.clone())
})
.collect::<Vec<_>>(),
)
.await
{
Ok(data) => Ok(BucketMultipartUploadResult {
key: data.key().to_string(),
version: data.version().to_string(),
size: data.size() as usize,

etag: data.etag().to_string(),
http_etag: data.http_etag().to_string(),
uploaded: DateTime::from_timestamp_millis(
data.uploaded().as_millis() as i64
)
.unwrap_or_default()
.to_utc(),

http_metadata: {
let obj = data.http_metadata();

BucketMultipartUploadResultHttpMetadata {
content_type: obj.content_type.map(|s| s.to_string()),
content_language: obj.content_language.map(|s| s.to_string()),
content_disposition: obj.content_disposition.map(|s| s.to_string()),
content_encoding: obj.content_encoding.map(|s| s.to_string()),
cache_control: obj.cache_control.map(|s| s.to_string()),
cache_expiry: obj.cache_expiry.map(|ts| {
DateTime::from_timestamp_millis(ts.as_millis() as i64)
.unwrap_or_default()
.to_utc()
}),
}
},
custom_metadata: data.custom_metadata().unwrap_or_default(),
}),
Err(err) => Err(anyhow!("Failed to append multipart upload: {:?}", err)),
},
Err(err) => Err(anyhow!("Failed to resume multipart upload: {:?}", err)),
}
})
.await;

ret
}

async fn abort_multipart_upload(&self, upload_id: String) -> Result<()> {
todo!()
let env = self.env.bucket(self.bucket_name.as_str())?;

let (key, upload_id) = upload_id
.split_once('_')
.map(|(key, upload_id)| (key.to_string(), upload_id.to_string()))
.ok_or(anyhow!(
"Failed to split into key and upload_id: {:?}",
upload_id
))?;

let ret = SendFuture::new(async move {
match env.resume_multipart_upload(key, upload_id.clone()) {
Ok(uploader) => match uploader.abort().await {
Ok(_) => {
self.delete(format!("__multi_{}", upload_id)).await?;
Ok(())
}
Err(err) => Err(anyhow!("Failed to abort multipart upload: {:?}", err)),
},
Err(err) => Err(anyhow!("Failed to resume multipart upload: {:?}", err)),
}
})
.await;

ret
}
}

Expand Down

0 comments on commit 9bfa84a

Please sign in to comment.