Skip to content

Commit

Permalink
Implement Results::set_pipeline() (#515)
Browse files Browse the repository at this point in the history
 Implement `Results::set_pipeline()`.

This provides the same functionality as capnproto-c++'s
`CallContext::setPipeline()`, but with a slightly different API.

Whereas in capnproto-c++ `setPipeline()` requires usage of a PipelineBuilder,
here we just re-use the existing Results message.
  • Loading branch information
dwrensha authored Sep 9, 2024
1 parent 7bdb6a4 commit bcfb07a
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 18 deletions.
41 changes: 35 additions & 6 deletions capnp-rpc/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,19 @@ struct Results {
message: Option<message::Builder<message::HeapAllocator>>,
cap_table: Vec<Option<Box<dyn ClientHook>>>,
results_done_fulfiller: Option<oneshot::Sender<Box<dyn ResultsDoneHook>>>,
pipeline_sender: Option<crate::queued::PipelineInnerSender>,
}

impl Results {
fn new(fulfiller: oneshot::Sender<Box<dyn ResultsDoneHook>>) -> Self {
fn new(
fulfiller: oneshot::Sender<Box<dyn ResultsDoneHook>>,
pipeline_sender: crate::queued::PipelineInnerSender,
) -> Self {
Self {
message: Some(::capnp::message::Builder::new_default()),
cap_table: Vec::new(),
results_done_fulfiller: Some(fulfiller),
pipeline_sender: Some(pipeline_sender),
}
}
}
Expand Down Expand Up @@ -126,6 +131,25 @@ impl ResultsHook for Results {
}
}

fn set_pipeline(&mut self) -> capnp::Result<()> {
use ::capnp::traits::ImbueMut;
let root = self.get()?;
let size = root.target_size()?;
let mut message2 = capnp::message::Builder::new(
capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
);
let mut root2: capnp::any_pointer::Builder = message2.init_root();
let mut cap_table2 = vec![];
root2.imbue_mut(&mut cap_table2);
root2.set_as(root.into_reader())?;
let hook = Box::new(ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
let Some(sender) = self.pipeline_sender.take() else {
return Err(Error::failed("set_pipeline() called twice".into()));
};
sender.complete(Box::new(Pipeline::new(hook)));
Ok(())
}

fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
unimplemented!()
}
Expand All @@ -147,12 +171,12 @@ struct ResultsDoneInner {
cap_table: Vec<Option<Box<dyn ClientHook>>>,
}

struct ResultsDone {
pub(crate) struct ResultsDone {
inner: Rc<ResultsDoneInner>,
}

impl ResultsDone {
fn new(
pub(crate) fn new(
message: message::Builder<message::HeapAllocator>,
cap_table: Vec<Option<Box<dyn ClientHook>>>,
) -> Self {
Expand Down Expand Up @@ -181,6 +205,8 @@ pub struct Request {
interface_id: u64,
method_id: u16,
client: Box<dyn ClientHook>,
pipeline: crate::queued::Pipeline,
pipeline_sender: crate::queued::PipelineInnerSender,
}

impl Request {
Expand All @@ -190,12 +216,15 @@ impl Request {
_size_hint: Option<::capnp::MessageSize>,
client: Box<dyn ClientHook>,
) -> Self {
let (pipeline_sender, pipeline) = crate::queued::Pipeline::new();
Self {
message: message::Builder::new_default(),
cap_table: Vec::new(),
interface_id,
method_id,
client,
pipeline,
pipeline_sender,
}
}
}
Expand All @@ -217,17 +246,17 @@ impl RequestHook for Request {
interface_id,
method_id,
client,
mut pipeline,
pipeline_sender,
} = tmp;
let params = Params::new(message, cap_table);

let (results_done_fulfiller, results_done_promise) =
oneshot::channel::<Box<dyn ResultsDoneHook>>();
let results_done_promise = results_done_promise.map_err(crate::canceled_to_error);
let results = Results::new(results_done_fulfiller);
let results = Results::new(results_done_fulfiller, pipeline_sender.weak_clone());
let promise = client.call(interface_id, method_id, Box::new(params), Box::new(results));

let (pipeline_sender, mut pipeline) = crate::queued::Pipeline::new();

let p = futures::future::try_join(promise, results_done_promise).and_then(
move |((), results_done_hook)| {
pipeline_sender
Expand Down
50 changes: 40 additions & 10 deletions capnp-rpc/src/queued.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ pub struct PipelineInner {

impl PipelineInner {
fn resolve(this: &Rc<RefCell<Self>>, result: Result<Box<dyn PipelineHook>, Error>) {
assert!(this.borrow().redirect.is_none());
if this.borrow().redirect.is_some() {
// Already resolved, probably by set_pipeline().
return;
}

let pipeline = match result {
Ok(pipeline_hook) => pipeline_hook,
Err(e) => Box::new(broken::Pipeline::new(e)),
Expand All @@ -66,18 +70,30 @@ impl PipelineInner {

pub struct PipelineInnerSender {
inner: Option<Weak<RefCell<PipelineInner>>>,
resolve_on_drop: bool,
}

impl PipelineInnerSender {
pub(crate) fn weak_clone(&self) -> Self {
Self {
inner: self.inner.clone(),
resolve_on_drop: false,
}
}
}

impl Drop for PipelineInnerSender {
fn drop(&mut self) {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
PipelineInner::resolve(
&pipeline_inner,
Ok(Box::new(crate::broken::Pipeline::new(Error::failed(
"PipelineInnerSender was canceled".into(),
)))),
);
if self.resolve_on_drop {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
PipelineInner::resolve(
&pipeline_inner,
Ok(Box::new(crate::broken::Pipeline::new(Error::failed(
"PipelineInnerSender was canceled".into(),
)))),
);
}
}
}
}
Expand Down Expand Up @@ -108,6 +124,7 @@ impl Pipeline {
(
PipelineInnerSender {
inner: Some(Rc::downgrade(&inner)),
resolve_on_drop: true,
},
Self { inner },
)
Expand Down Expand Up @@ -271,9 +288,22 @@ impl ClientHook for Client {
.attach(inner_clone)
.and_then(|x| x);

// We need to drive `promise_to_drive` until we have a result.
match self.inner.borrow().promise_to_drive {
Some(ref p) => {
Promise::from_future(futures::future::try_join(p.clone(), promise).map_ok(|v| v.1))
let p1 = p.clone();
Promise::from_future(async move {
match futures::future::select(p1, promise).await {
futures::future::Either::Left((Ok(()), promise)) => promise.await,
futures::future::Either::Left((Err(e), _)) => Err(e),
futures::future::Either::Right((r, _)) => {
// Don't bother waiting for `promise_to_drive` to resolve.
// If we're here because set_pipeline() was called, then
// `promise_to_drive` might in fact never resolve.
r
}
}
})
}
None => Promise::from_future(promise),
}
Expand Down
30 changes: 29 additions & 1 deletion capnp-rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,15 @@ impl<VatId> ConnectionState<VatId> {

let (results_inner_fulfiller, results_inner_promise) = oneshot::channel();
let results_inner_promise = results_inner_promise.map_err(crate::canceled_to_error);

let (pipeline_sender, mut pipeline) = queued::Pipeline::new();
let results = Results::new(
&connection_state,
question_id,
redirect_results,
results_inner_fulfiller,
answer.received_finish.clone(),
Some(pipeline_sender.weak_clone()),
);

let (redirected_results_done_promise, redirected_results_done_fulfiller) =
Expand All @@ -965,7 +968,6 @@ impl<VatId> ConnectionState<VatId> {

let call_promise =
capability.call(interface_id, method_id, Box::new(params), Box::new(results));
let (pipeline_sender, mut pipeline) = queued::Pipeline::new();

let promise = call_promise
.then(move |call_result| {
Expand Down Expand Up @@ -2141,6 +2143,7 @@ where
redirect_results: bool,
answer_id: AnswerId,
finish_received: Rc<Cell<bool>>,
pipeline_sender: Option<queued::PipelineInnerSender>,
}

impl<VatId> ResultsInner<VatId>
Expand Down Expand Up @@ -2195,6 +2198,7 @@ where
redirect_results: bool,
fulfiller: oneshot::Sender<ResultsInner<VatId>>,
finish_received: Rc<Cell<bool>>,
pipeline_sender: Option<queued::PipelineInnerSender>,
) -> Self {
Self {
inner: Some(ResultsInner {
Expand All @@ -2203,6 +2207,7 @@ where
redirect_results,
answer_id,
finish_received,
pipeline_sender,
}),
results_done_fulfiller: Some(fulfiller),
}
Expand Down Expand Up @@ -2250,6 +2255,29 @@ impl<VatId> ResultsHook for Results<VatId> {
}
}

fn set_pipeline(&mut self) -> ::capnp::Result<()> {
use ::capnp::traits::ImbueMut;
let root = self.get()?;
let size = root.target_size()?;
let mut message2 = capnp::message::Builder::new(
capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
);
let mut root2: capnp::any_pointer::Builder = message2.init_root();
let mut cap_table2 = vec![];
root2.imbue_mut(&mut cap_table2);
root2.set_as(root.into_reader())?;
let hook =
Box::new(local::ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
let Some(ref mut inner) = self.inner else {
unreachable!();
};
let Some(sender) = inner.pipeline_sender.take() else {
return Err(Error::failed("set_pipeline() called twice".into()));
};
sender.complete(Box::new(local::Pipeline::new(hook)));
Ok(())
}

fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
unimplemented!()
}
Expand Down
13 changes: 13 additions & 0 deletions capnp-rpc/test/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,19 @@ impl test_pipeline::Server for TestPipeline {
) -> Promise<(), Error> {
Promise::ok(())
}

fn get_cap_pipeline_only(
&mut self,
_params: test_pipeline::GetCapPipelineOnlyParams,
mut results: test_pipeline::GetCapPipelineOnlyResults,
) -> Promise<(), Error> {
results
.get()
.init_out_box()
.set_cap(capnp_rpc::new_client::<test_extends::Client, _>(TestExtends).cast_to());
pry!(results.set_pipeline());
Promise::from_future(::futures::future::pending())
}
}

#[derive(Default)]
Expand Down
3 changes: 3 additions & 0 deletions capnp-rpc/test/test.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ interface TestPipeline {
getNullCap @1 () -> (cap :TestInterface);
testPointers @2 (cap :TestInterface, obj :AnyPointer, list :List(TestInterface)) -> ();

getCapPipelineOnly @3 () -> (outBox :Box);
# Never returns, but uses setPipeline() to make the pipeline work.

struct Box {
cap @0 :TestInterface;

Expand Down
Loading

0 comments on commit bcfb07a

Please sign in to comment.