diff --git a/lib/compute-at-edge-abi/compute-at-edge.witx b/lib/compute-at-edge-abi/compute-at-edge.witx index 764bb45c..00d5c29d 100644 --- a/lib/compute-at-edge-abi/compute-at-edge.witx +++ b/lib/compute-at-edge-abi/compute-at-edge.witx @@ -500,6 +500,19 @@ (result $err (expected (error $fastly_status))) ) + (@interface func (export "lookup_async") + (param $store $object_store_handle) + (param $key string) + (param $pending_body_handle_out (@witx pointer $pending_kv_lookup_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "pending_lookup_wait") + (param $pending_body_handle $pending_kv_lookup_handle) + (param $body_handle_out (@witx pointer $body_handle)) + (result $err (expected (error $fastly_status))) + ) + (@interface func (export "insert") (param $store $object_store_handle) (param $key string) diff --git a/lib/compute-at-edge-abi/typenames.witx b/lib/compute-at-edge-abi/typenames.witx index 36291311..84a0c1dd 100644 --- a/lib/compute-at-edge-abi/typenames.witx +++ b/lib/compute-at-edge-abi/typenames.witx @@ -94,6 +94,8 @@ (typename $dictionary_handle (handle)) ;;; A handle to an Object Store. (typename $object_store_handle (handle)) +;;; A handle to a pending KV request. +(typename $pending_kv_lookup_handle (handle)) ;;; A handle to a Secret Store. (typename $secret_store_handle (handle)) ;;; A handle to an individual secret. diff --git a/lib/src/error.rs b/lib/src/error.rs index 4a164f2c..95d9a73a 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -238,6 +238,10 @@ pub enum HandleError { #[error("Invalid pending request handle: {0}")] InvalidPendingRequestHandle(crate::wiggle_abi::types::PendingRequestHandle), + /// A lookup handle was not valid. + #[error("Invalid pending KV lookup handle: {0}")] + InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle), + /// A dictionary handle was not valid. #[error("Invalid dictionary handle: {0}")] InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle), diff --git a/lib/src/session.rs b/lib/src/session.rs index 0dd7e92e..36c3f792 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -3,7 +3,7 @@ mod async_item; mod downstream; -pub use async_item::{AsyncItem, PeekableTask}; +pub use async_item::{AsyncItem, PeekableTask, PendingKvTask}; use { self::downstream::DownstreamResponse, @@ -18,8 +18,8 @@ use { upstream::{SelectTarget, TlsConfig}, wiggle_abi::types::{ self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle, - ObjectStoreHandle, PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, - SecretStoreHandle, + ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle, + ResponseHandle, SecretHandle, SecretStoreHandle, }, }, cranelift_entity::{entity_impl, PrimaryMap}, @@ -639,6 +639,49 @@ impl Session { self.object_store.lookup(obj_store_key, obj_key) } + /// Insert a [`PendingLookup`] into the session. + /// + /// This method returns a new [`PendingKvLookupHandle`], which can then be used to access + /// and mutate the pending lookup. + pub fn insert_pending_kv_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle { + self.async_items + .push(Some(AsyncItem::PendingKvLookup(pending))) + .into() + } + + /// Take ownership of a [`PendingLookup`], given its [`PendingKvLookupHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a pending lookup in the + /// session. + pub fn take_pending_kv_lookup( + &mut self, + handle: PendingKvLookupHandle, + ) -> Result { + // check that this is a pending request before removing it + let _ = self.pending_kv_lookup(handle)?; + + self.async_items + .get_mut(handle.into()) + .and_then(Option::take) + .and_then(AsyncItem::into_pending_kv_lookup) + .ok_or(HandleError::InvalidPendingKvLookupHandle(handle)) + } + + /// Get a reference to a [`PendingLookup`], given its [`PendingKvLookupHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a lookup in the + /// session. + pub fn pending_kv_lookup( + &self, + handle: PendingKvLookupHandle, + ) -> Result<&PendingKvTask, HandleError> { + self.async_items + .get(handle.into()) + .and_then(Option::as_ref) + .and_then(AsyncItem::as_pending_kv_lookup) + .ok_or(HandleError::InvalidPendingKvLookupHandle(handle)) + } + // ----- Secret Store API ----- pub fn secret_store_handle(&mut self, name: &str) -> Option { @@ -912,3 +955,15 @@ impl From for types::AsyncItemHandle { types::AsyncItemHandle::from(h.as_u32()) } } + +impl From for AsyncItemHandle { + fn from(h: PendingKvLookupHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for PendingKvLookupHandle { + fn from(h: AsyncItemHandle) -> PendingKvLookupHandle { + PendingKvLookupHandle::from(h.as_u32()) + } +} diff --git a/lib/src/session/async_item.rs b/lib/src/session/async_item.rs index 747e98f0..46a443cd 100644 --- a/lib/src/session/async_item.rs +++ b/lib/src/session/async_item.rs @@ -1,3 +1,4 @@ +use crate::object_store::ObjectStoreError; use crate::{body::Body, error::Error, streaming_body::StreamingBody}; use anyhow::anyhow; use futures::Future; @@ -5,6 +6,8 @@ use futures::FutureExt; use http::Response; use tokio::sync::oneshot; +pub type PendingKvTask = PeekableTask, ObjectStoreError>>; + /// Represents either a full body, or the write end of a streaming body. /// /// This enum is needed because we reuse the handle for a body when it is transformed into a streaming @@ -14,6 +17,7 @@ pub enum AsyncItem { Body(Body), StreamingBody(StreamingBody), PendingReq(PeekableTask>), + PendingKvLookup(PendingKvTask), } impl AsyncItem { @@ -70,6 +74,20 @@ impl AsyncItem { } } + pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvTask> { + match self { + Self::PendingKvLookup(req) => Some(req), + _ => None, + } + } + + pub fn into_pending_kv_lookup(self) -> Option { + match self { + Self::PendingKvLookup(req) => Some(req), + _ => None, + } + } + pub fn as_pending_req(&self) -> Option<&PeekableTask>> { match self { Self::PendingReq(req) => Some(req), @@ -96,6 +114,7 @@ impl AsyncItem { Self::StreamingBody(body) => body.await_ready().await, Self::Body(body) => body.await_ready().await, Self::PendingReq(req) => req.await_ready().await, + Self::PendingKvLookup(obj) => obj.await_ready().await, } } @@ -110,6 +129,12 @@ impl From>> for AsyncItem { } } +impl From for AsyncItem { + fn from(task: PendingKvTask) -> Self { + Self::PendingKvLookup(task) + } +} + #[derive(Debug)] pub enum PeekableTask { Waiting(oneshot::Receiver>), diff --git a/lib/src/wiggle_abi.rs b/lib/src/wiggle_abi.rs index c8923220..a6d7fe16 100644 --- a/lib/src/wiggle_abi.rs +++ b/lib/src/wiggle_abi.rs @@ -70,7 +70,7 @@ wiggle::from_witx!({ errors: { fastly_status => Error }, async: { fastly_async_io::{select}, - fastly_object_store::{insert}, + fastly_object_store::{insert, lookup_async, pending_lookup_wait}, fastly_http_body::{append, read, write}, fastly_http_req::{pending_req_select, pending_req_poll, pending_req_wait, send, send_async, send_async_streaming}, } diff --git a/lib/src/wiggle_abi/obj_store_impl.rs b/lib/src/wiggle_abi/obj_store_impl.rs index 40c2958c..1d0cc908 100644 --- a/lib/src/wiggle_abi/obj_store_impl.rs +++ b/lib/src/wiggle_abi/obj_store_impl.rs @@ -1,5 +1,8 @@ //! fastly_obj_store` hostcall implementations. +use super::types::PendingKvLookupHandle; +use crate::session::PeekableTask; + use { crate::{ body::Body, @@ -49,6 +52,42 @@ impl FastlyObjectStore for Session { } } + async fn lookup_async<'a>( + &mut self, + store: ObjectStoreHandle, + key: &GuestPtr, + opt_pending_body_handle_out: &GuestPtr, + ) -> Result<(), Error> { + let store = self.get_obj_store_key(store).unwrap(); + let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?; + // just create a future that's already ready + let fut = futures::future::ok(self.obj_lookup(store, &key)); + let task = PeekableTask::spawn(fut).await; + opt_pending_body_handle_out.write(self.insert_pending_kv_lookup(task))?; + Ok(()) + } + + async fn pending_lookup_wait<'a>( + &mut self, + pending_body_handle: PendingKvLookupHandle, + opt_body_handle_out: &GuestPtr, + ) -> Result<(), Error> { + let pending_obj = self + .take_pending_kv_lookup(pending_body_handle)? + .recv() + .await?; + // proceed with the normal match from lookup() + match pending_obj { + Ok(obj) => { + let new_handle = self.insert_body(Body::from(obj)); + opt_body_handle_out.write(new_handle)?; + Ok(()) + } + Err(ObjectStoreError::MissingObject) => Ok(()), + Err(err) => Err(err.into()), + } + } + async fn insert<'a>( &mut self, store: ObjectStoreHandle,