Skip to content

Commit

Permalink
Move methods onto Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 12, 2024
1 parent 5492390 commit 5434271
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,42 @@ impl Storage {
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}

fn create_upload(&mut self) -> MultipartId {
let etag = self.next_etag;
self.next_etag += 1;
self.uploads.insert(etag, Default::default());
etag.to_string()
}

fn put_part(&mut self, id: &MultipartId, part_idx: usize, data: Bytes) -> Result<PartId> {
let upload = self.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(data);
Ok(PartId {
content_id: Default::default(),
})
}

fn complete_upload(&mut self, path: &Path, id: &MultipartId) -> Result<PutResult> {
let upload = self.remove_upload(id)?;

let mut cap = 0;
for (part, x) in upload.parts.iter().enumerate() {
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
}
let mut buf = Vec::with_capacity(cap);
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
let etag = self.insert(path, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}
}

impl std::fmt::Display for InMemory {
Expand Down Expand Up @@ -393,11 +429,7 @@ impl ObjectStore for InMemory {
#[async_trait]
impl MultiPartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
storage.next_etag += 1;
storage.uploads.insert(etag, Default::default());
Ok(etag.to_string())
Ok(self.storage.write().create_upload())
}

async fn put_part(
Expand All @@ -407,15 +439,7 @@ impl MultiPartStore for InMemory {
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(data);
Ok(PartId {
content_id: Default::default(),
})
self.storage.write().put_part(id, part_idx, data)
}

async fn complete_multipart(
Expand All @@ -424,22 +448,7 @@ impl MultiPartStore for InMemory {
id: &MultipartId,
_parts: Vec<PartId>,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let upload = storage.remove_upload(id)?;

let mut cap = 0;
for (part, x) in upload.parts.iter().enumerate() {
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
}
let mut buf = Vec::with_capacity(cap);
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
let etag = storage.insert(path, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
self.storage.write().complete_upload(path, id)
}

async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
Expand Down

0 comments on commit 5434271

Please sign in to comment.