Skip to content

Commit

Permalink
Preserve runtime output stream order
Browse files Browse the repository at this point in the history
  • Loading branch information
mfranciszkiewicz committed Jan 15, 2021
1 parent dbaf759 commit 191822a
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion exe-unit/runtime-api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ya-runtime-api"
version = "0.1.1"
version = "0.1.2"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"

Expand Down
27 changes: 10 additions & 17 deletions exe-unit/runtime-api/src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,9 @@ where
async move {
while let Some(event) = rx.next().await {
log::debug!("event: {:?}", event);
tokio::task::spawn_local({
let output = output.clone();
async move {
let mut output = output.lock().await;
let r = SinkExt::send(&mut *output, event).await;
log::debug!("sending event done: {:?}", r);
}
});
let mut output = output.lock().await;
let r = SinkExt::send(&mut *output, event).await;
log::debug!("sending event done: {:?}", r);
}
}
});
Expand All @@ -104,15 +99,13 @@ where
Ok(request) => {
let service = service.clone();
let output = output.clone();
tokio::task::spawn_local(async move {
log::debug!("received request: {:?}", request);
let resp = handle(service.as_ref(), request).await;
log::debug!("response to send: {:?}", resp);
let mut output = output.lock().await;
log::debug!("sending");
let r = SinkExt::send(&mut *output, resp).await;
log::debug!("sending done: {:?}", r);
});
log::debug!("received request: {:?}", request);
let resp = handle(service.as_ref(), request).await;
log::debug!("response to send: {:?}", resp);
let mut output = output.lock().await;
log::debug!("sending");
let r = SinkExt::send(&mut *output, resp).await;
log::debug!("sending done: {:?}", r);
}
Err(e) => {
log::error!("fail: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion service-bus/bus/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ where
Err(Error::GsbFailure(String::from_utf8(chunk.into_bytes())?))
}
};
let _ = ctx.spawn(
let _ = ctx.wait(
async move {
let s = r.send(item);
s.await
Expand Down

0 comments on commit 191822a

Please sign in to comment.