Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Byoung/object store async lookup #253

Merged
merged 6 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,19 @@
(result $err (expected (error $fastly_status)))
)

(@interface func (export "lookup_async")
(param $store $object_store_handle)
(param $key string)
(param $pending_body_handle_out (@witx pointer $pending_kv_lookup_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "pending_lookup_wait")
(param $pending_body_handle $pending_kv_lookup_handle)
(param $body_handle_out (@witx pointer $body_handle))
(result $err (expected (error $fastly_status)))
)

(@interface func (export "insert")
(param $store $object_store_handle)
(param $key string)
Expand Down
2 changes: 2 additions & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@
(typename $dictionary_handle (handle))
;;; A handle to an Object Store.
(typename $object_store_handle (handle))
;;; A handle to a pending KV request.
(typename $pending_kv_lookup_handle (handle))
;;; A handle to a Secret Store.
(typename $secret_store_handle (handle))
;;; A handle to an individual secret.
Expand Down
4 changes: 4 additions & 0 deletions lib/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ pub enum HandleError {
#[error("Invalid pending request handle: {0}")]
InvalidPendingRequestHandle(crate::wiggle_abi::types::PendingRequestHandle),

/// A lookup handle was not valid.
#[error("Invalid pending KV lookup handle: {0}")]
InvalidPendingKvLookupHandle(crate::wiggle_abi::types::PendingKvLookupHandle),

/// A dictionary handle was not valid.
#[error("Invalid dictionary handle: {0}")]
InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle),
Expand Down
61 changes: 58 additions & 3 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod async_item;
mod downstream;

pub use async_item::{AsyncItem, PeekableTask};
pub use async_item::{AsyncItem, PeekableTask, PendingKvTask};

use {
self::downstream::DownstreamResponse,
Expand All @@ -18,8 +18,8 @@ use {
upstream::{SelectTarget, TlsConfig},
wiggle_abi::types::{
self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle,
ObjectStoreHandle, PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle,
SecretStoreHandle,
ObjectStoreHandle, PendingKvLookupHandle, PendingRequestHandle, RequestHandle,
ResponseHandle, SecretHandle, SecretStoreHandle,
},
},
cranelift_entity::{entity_impl, PrimaryMap},
Expand Down Expand Up @@ -639,6 +639,49 @@ impl Session {
self.object_store.lookup(obj_store_key, obj_key)
}

/// Insert a [`PendingLookup`] into the session.
///
/// This method returns a new [`PendingKvLookupHandle`], which can then be used to access
/// and mutate the pending lookup.
pub fn insert_pending_kv_lookup(&mut self, pending: PendingKvTask) -> PendingKvLookupHandle {
self.async_items
.push(Some(AsyncItem::PendingKvLookup(pending)))
.into()
}

/// Take ownership of a [`PendingLookup`], given its [`PendingKvLookupHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a pending lookup in the
/// session.
pub fn take_pending_kv_lookup(
&mut self,
handle: PendingKvLookupHandle,
) -> Result<PendingKvTask, HandleError> {
// check that this is a pending request before removing it
let _ = self.pending_kv_lookup(handle)?;

self.async_items
.get_mut(handle.into())
.and_then(Option::take)
.and_then(AsyncItem::into_pending_kv_lookup)
.ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
}

/// Get a reference to a [`PendingLookup`], given its [`PendingKvLookupHandle`].
///
/// Returns a [`HandleError`] if the handle is not associated with a lookup in the
/// session.
pub fn pending_kv_lookup(
&self,
handle: PendingKvLookupHandle,
) -> Result<&PendingKvTask, HandleError> {
self.async_items
.get(handle.into())
.and_then(Option::as_ref)
.and_then(AsyncItem::as_pending_kv_lookup)
.ok_or(HandleError::InvalidPendingKvLookupHandle(handle))
}

// ----- Secret Store API -----

pub fn secret_store_handle(&mut self, name: &str) -> Option<SecretStoreHandle> {
Expand Down Expand Up @@ -912,3 +955,15 @@ impl From<AsyncItemHandle> for types::AsyncItemHandle {
types::AsyncItemHandle::from(h.as_u32())
}
}

impl From<PendingKvLookupHandle> for AsyncItemHandle {
fn from(h: PendingKvLookupHandle) -> AsyncItemHandle {
AsyncItemHandle::from_u32(h.into())
}
}

impl From<AsyncItemHandle> for PendingKvLookupHandle {
fn from(h: AsyncItemHandle) -> PendingKvLookupHandle {
PendingKvLookupHandle::from(h.as_u32())
}
}
25 changes: 25 additions & 0 deletions lib/src/session/async_item.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::object_store::ObjectStoreError;
use crate::{body::Body, error::Error, streaming_body::StreamingBody};
use anyhow::anyhow;
use futures::Future;
use futures::FutureExt;
use http::Response;
use tokio::sync::oneshot;

pub type PendingKvTask = PeekableTask<Result<Vec<u8>, ObjectStoreError>>;

/// Represents either a full body, or the write end of a streaming body.
///
/// This enum is needed because we reuse the handle for a body when it is transformed into a streaming
Expand All @@ -14,6 +17,7 @@ pub enum AsyncItem {
Body(Body),
StreamingBody(StreamingBody),
PendingReq(PeekableTask<Response<Body>>),
PendingKvLookup(PendingKvTask),
}

impl AsyncItem {
Expand Down Expand Up @@ -70,6 +74,20 @@ impl AsyncItem {
}
}

pub fn as_pending_kv_lookup(&self) -> Option<&PendingKvTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn into_pending_kv_lookup(self) -> Option<PendingKvTask> {
match self {
Self::PendingKvLookup(req) => Some(req),
_ => None,
}
}

pub fn as_pending_req(&self) -> Option<&PeekableTask<Response<Body>>> {
match self {
Self::PendingReq(req) => Some(req),
Expand All @@ -96,6 +114,7 @@ impl AsyncItem {
Self::StreamingBody(body) => body.await_ready().await,
Self::Body(body) => body.await_ready().await,
Self::PendingReq(req) => req.await_ready().await,
Self::PendingKvLookup(obj) => obj.await_ready().await,
}
}

Expand All @@ -110,6 +129,12 @@ impl From<PeekableTask<Response<Body>>> for AsyncItem {
}
}

impl From<PendingKvTask> for AsyncItem {
fn from(task: PendingKvTask) -> Self {
Self::PendingKvLookup(task)
}
}

#[derive(Debug)]
pub enum PeekableTask<T> {
Waiting(oneshot::Receiver<Result<T, Error>>),
Expand Down
2 changes: 1 addition & 1 deletion lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ wiggle::from_witx!({
errors: { fastly_status => Error },
async: {
fastly_async_io::{select},
fastly_object_store::{insert},
fastly_object_store::{insert, lookup_async, pending_lookup_wait},
fastly_http_body::{append, read, write},
fastly_http_req::{pending_req_select, pending_req_poll, pending_req_wait, send, send_async, send_async_streaming},
}
Expand Down
39 changes: 39 additions & 0 deletions lib/src/wiggle_abi/obj_store_impl.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! fastly_obj_store` hostcall implementations.

use super::types::PendingKvLookupHandle;
use crate::session::PeekableTask;

use {
crate::{
body::Body,
Expand Down Expand Up @@ -49,6 +52,42 @@ impl FastlyObjectStore for Session {
}
}

async fn lookup_async<'a>(
&mut self,
store: ObjectStoreHandle,
key: &GuestPtr<str>,
opt_pending_body_handle_out: &GuestPtr<PendingKvLookupHandle>,
) -> Result<(), Error> {
let store = self.get_obj_store_key(store).unwrap();
let key = ObjectKey::new(&*key.as_str()?.ok_or(Error::SharedMemory)?)?;
// just create a future that's already ready
let fut = futures::future::ok(self.obj_lookup(store, &key));
let task = PeekableTask::spawn(fut).await;
opt_pending_body_handle_out.write(self.insert_pending_kv_lookup(task))?;
Ok(())
}

async fn pending_lookup_wait<'a>(
&mut self,
pending_body_handle: PendingKvLookupHandle,
opt_body_handle_out: &GuestPtr<BodyHandle>,
) -> Result<(), Error> {
let pending_obj = self
.take_pending_kv_lookup(pending_body_handle)?
.recv()
.await?;
// proceed with the normal match from lookup()
match pending_obj {
Ok(obj) => {
let new_handle = self.insert_body(Body::from(obj));
opt_body_handle_out.write(new_handle)?;
Ok(())
}
Err(ObjectStoreError::MissingObject) => Ok(()),
Err(err) => Err(err.into()),
}
}

async fn insert<'a>(
&mut self,
store: ObjectStoreHandle,
Expand Down