Skip to content

Commit

Permalink
util: Fix call_all hang when stream is pending (#656)
Browse files Browse the repository at this point in the history
Currently `call_all` will hang in a busy loop if called when the input
stream is pending.
  • Loading branch information
leoyvens authored and hawkw committed Jun 17, 2022
1 parent 7d81577 commit d989a82
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
26 changes: 12 additions & 14 deletions tower/src/util/call_all/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,18 @@ where
.expect("Using CallAll after extracing inner Service");
ready!(svc.poll_ready(cx)).map_err(Into::into)?;

// If it is, gather the next request (if there is one)
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(r) => match r {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
},
Poll::Pending => {
// TODO: We probably want to "release" the slot we reserved in Svc here.
// It may be a while until we get around to actually using it.
// If it is, gather the next request (if there is one), or return `Pending` if the
// stream is not ready.
// TODO: We probably want to "release" the slot we reserved in Svc if the
// stream returns `Pending`. It may be a while until we get around to actually
// using it.
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions tower/tests/util/call_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,24 @@ async fn unordered() {
.unwrap();
assert!(v.is_none());
}

#[tokio::test]
async fn pending() {
let _t = support::trace_init();

let (mock, mut handle) = mock::pair::<_, &'static str>();

let mut task = task::spawn(());

let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = mock.call_all(support::IntoStream::new(rx));
pin_mut!(ca);

assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
tx.send("req").unwrap();
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
assert_request_eq!(handle, "req").send_response("res");
let res = assert_ready!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
assert_eq!(res.transpose().unwrap(), Some("res"));
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
}

0 comments on commit d989a82

Please sign in to comment.