Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Byoung/object store async lookup #253

Merged
merged 6 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,12 @@
(@interface func (export "lookup_async")
(param $store $object_store_handle)
(param $key string)
(param $pending_body_handle_out (@witx pointer $pending_object_store_handle))
(param $pending_body_handle_out (@witx pointer $pending_kv_lookup_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "pending_lookup_wait")
(param $pending_body_handle $pending_object_store_handle)
(param $pending_body_handle $pending_kv_lookup_handle)
(param $body_handle_out (@witx pointer $body_handle))
(result $err (expected (error $fastly_status)))
)
Expand Down
4 changes: 2 additions & 2 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@
(typename $dictionary_handle (handle))
;;; A handle to an Object Store.
(typename $object_store_handle (handle))
;;; A handle to a pending Object Store request.
(typename $pending_object_store_handle (handle))
;;; A handle to a pending KV request.
(typename $pending_kv_lookup_handle (handle))
;;; A handle to a Secret Store.
(typename $secret_store_handle (handle))
;;; A handle to an individual secret.
Expand Down
4 changes: 2 additions & 2 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ pub enum HandleError {
InvalidPendingRequestHandle(crate::wiggle_abi::types::PendingRequestHandle),

/// A lookup handle was not valid.
#[error("Invalid pending lookup handle: {0}")]
InvalidPendingLookupHandle(crate::wiggle_abi::types::PendingObjectStoreHandle),
#[error("Invalid pending KV lookup handle: {0}")]
InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle),

/// A dictionary handle was not valid.
#[error("Invalid dictionary handle: {0}")]
Expand Down
39 changes: 18 additions & 21 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod async_item;
mod downstream;

pub use async_item::{AsyncItem, PeekableTask};
pub use async_item::{AsyncItem, PeekableTask, PendingKvTask};

use {
self::downstream::DownstreamResponse,
Expand All @@ -18,7 +18,7 @@ use {
upstream::{SelectTarget, TlsConfig},
wiggle_abi::types::{
self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
ObjectStoreHandle, PendingObjectStoreHandle, PendingRequestHandle, RequestHandle,
ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
ResponseHandle, SecretHandle, SecretStoreHandle,
},
},
Expand Down Expand Up @@ -641,48 +641,45 @@ impl Session {

/// Insert a [`PendingLookup`] into the session.
///
/// This method returns a new [`PendingObjectStoreHandle`], which can then be used to access
/// This method returns a new [`PendingKvLookupHandle`], which can then be used to access
/// and mutate the pending lookup.
pub fn insert_pending_lookup(
&mut self,
pending: PeekableTask<Result<Vec<u8>, ObjectStoreError>>,
) -> PendingObjectStoreHandle {
pub fn insert_pending_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle {
self.async_items
.push(Some(AsyncItem::PendingLookup(pending)))
.push(Some(AsyncItem::PendingKvLookup(pending)))
.into()
}

/// Take ownership of a [`PendingLookup`], given its [`PendingObjectStoreHandle`].
/// Take ownership of a [`PendingLookup`], given its [`PendingKvLookupHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a pending lookup in the
/// session.
pub fn take_pending_lookup(
&mut self,
handle: PendingObjectStoreHandle,
) -> Result<PeekableTask<Result<Vec<u8>, ObjectStoreError>>, HandleError> {
handle: PendingKvLookupHandle,
) -> Result<PendingKvTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_lookup(handle)?;

self.async_items
.get_mut(handle.into())
.and_then(Option::take)
.and_then(AsyncItem::into_pending_lookup)
.ok_or(HandleError::InvalidPendingLookupHandle(handle))
.ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
}

/// Get a reference to a [`PendingLookup`], given its [`PendingObjectStoreHandle`].
/// Get a reference to a [`PendingLookup`], given its [`PendingKvLookupHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a lookup in the
/// session.
pub fn pending_lookup(
&self,
handle: PendingObjectStoreHandle,
) -> Result<&PeekableTask<Result<Vec<u8>, ObjectStoreError>>, HandleError> {
handle: PendingKvLookupHandle,
) -> Result<&PendingKvTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
.and_then(AsyncItem::as_pending_lookup)
.ok_or(HandleError::InvalidPendingLookupHandle(handle))
.ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
}

// ----- Secret Store API -----
Expand Down Expand Up @@ -959,14 +956,14 @@ impl From<AsyncItemHandle> for types::AsyncItemHandle {
}
}

impl From<PendingObjectStoreHandle> for AsyncItemHandle {
fn from(h: PendingObjectStoreHandle) -> AsyncItemHandle {
impl From<PendingKvLookupHandle> for AsyncItemHandle {
fn from(h: PendingKvLookupHandle) -> AsyncItemHandle {
AsyncItemHandle::from_u32(h.into())
}
}

impl From<AsyncItemHandle> for PendingObjectStoreHandle {
fn from(h: AsyncItemHandle) -> PendingObjectStoreHandle {
PendingObjectStoreHandle::from(h.as_u32())
impl From<AsyncItemHandle> for PendingKvLookupHandle {
fn from(h: AsyncItemHandle) -> PendingKvLookupHandle {
PendingKvLookupHandle::from(h.as_u32())
}
}
20 changes: 11 additions & 9 deletions lib/src/session/async_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;

pub type PendingKvTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;

/// Represents either a full body, or the write end of a streaming body.
///
/// This enum is needed because we reuse the handle for a body when it is transformed into a streaming
Expand All @@ -15,7 +17,7 @@ pub enum AsyncItem {
Body(Body),
StreamingBody(StreamingBody),
PendingReq(PeekableTask<Response<Body>>),
PendingLookup(PeekableTask<Result<Vec<u8>, ObjectStoreError>>),
PendingKvLookup(PendingKvTask),
}

impl AsyncItem {
Expand Down Expand Up @@ -72,16 +74,16 @@ impl AsyncItem {
}
}

pub fn as_pending_lookup(&self) -> Option<&PeekableTask<Result<Vec<u8>, ObjectStoreError>>> {
pub fn as_pending_lookup(&self) -> Option<&PendingKvTask> {
match self {
Self::PendingLookup(req) => Some(req),
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn into_pending_lookup(self) -> Option<PeekableTask<Result<Vec<u8>, ObjectStoreError>>> {
pub fn into_pending_lookup(self) -> Option<PendingKvTask> {
match self {
Self::PendingLookup(req) => Some(req),
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}
Expand Down Expand Up @@ -112,7 +114,7 @@ impl AsyncItem {
Self::StreamingBody(body) => body.await_ready().await,
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingLookup(obj) => obj.await_ready().await,
Self::PendingKvLookup(obj) => obj.await_ready().await,
}
}

Expand All @@ -127,9 +129,9 @@ impl From<PeekableTask<Response<Body>>> for AsyncItem {
}
}

impl From<PeekableTask<Result<Vec<u8>, ObjectStoreError>>> for AsyncItem {
fn from(req: PeekableTask<Result<Vec<u8>, ObjectStoreError>>) -> Self {
Self::PendingLookup(req)
impl From<PendingKvTask> for AsyncItem {
fn from(task: PendingKvTask) -> Self {
Self::PendingKvLookup(task)
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/src/wiggle_abi/obj_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! fastly_obj_store` hostcall implementations.

use super::types::PendingObjectStoreHandle;
use super::types::PendingKvLookupHandle;
use crate::session::PeekableTask;

use {
Expand Down Expand Up @@ -56,7 +56,7 @@ impl FastlyObjectStore for Session {
&mut self,
store: ObjectStoreHandle,
key: &GuestPtr<str>,
opt_pending_body_handle_out: &GuestPtr<PendingObjectStoreHandle>,
opt_pending_body_handle_out: &GuestPtr<PendingKvLookupHandle>,
) -> Result<(), Error> {
let store = self.get_obj_store_key(store).unwrap();
let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?;
Expand All @@ -69,7 +69,7 @@ impl FastlyObjectStore for Session {

async fn pending_lookup_wait<'a>(
&mut self,
pending_body_handle: PendingObjectStoreHandle,
pending_body_handle: PendingKvLookupHandle,
opt_body_handle_out: &GuestPtr<BodyHandle>,
) -> Result<(), Error> {
let pending_obj = self
Expand Down