From debba32216c64ff219c2c8502ec89e000ef53208 Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Fri, 15 Jan 2021 09:26:29 +0100 Subject: [PATCH 1/2] Preserve runtime output stream order --- Cargo.lock | 2 +- exe-unit/runtime-api/Cargo.toml | 2 +- exe-unit/runtime-api/src/server/service.rs | 11 +++-------- service-bus/bus/src/connection.rs | 2 +- 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1be828412..be1bacd336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7446,7 +7446,7 @@ dependencies = [ [[package]] name = "ya-runtime-api" -version = "0.1.1" +version = "0.1.2" dependencies = [ "anyhow", "bytes 0.5.6", diff --git a/exe-unit/runtime-api/Cargo.toml b/exe-unit/runtime-api/Cargo.toml index c2376c09be..f61dd65def 100644 --- a/exe-unit/runtime-api/Cargo.toml +++ b/exe-unit/runtime-api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ya-runtime-api" -version = "0.1.1" +version = "0.1.2" authors = ["Golem Factory "] edition = "2018" diff --git a/exe-unit/runtime-api/src/server/service.rs b/exe-unit/runtime-api/src/server/service.rs index 8de4a54ea1..9d91fee7b7 100644 --- a/exe-unit/runtime-api/src/server/service.rs +++ b/exe-unit/runtime-api/src/server/service.rs @@ -85,14 +85,9 @@ where async move { while let Some(event) = rx.next().await { log::debug!("event: {:?}", event); - tokio::task::spawn_local({ - let output = output.clone(); - async move { - let mut output = output.lock().await; - let r = SinkExt::send(&mut *output, event).await; - log::debug!("sending event done: {:?}", r); - } - }); + let mut output = output.lock().await; + let r = SinkExt::send(&mut *output, event).await; + log::debug!("sending event done: {:?}", r); } } }); diff --git a/service-bus/bus/src/connection.rs b/service-bus/bus/src/connection.rs index 948d9dcb15..bc1d201e93 100644 --- a/service-bus/bus/src/connection.rs +++ b/service-bus/bus/src/connection.rs @@ -373,7 +373,7 @@ where Err(Error::GsbFailure(String::from_utf8(chunk.into_bytes())?)) } }; - let _ = ctx.spawn( + let _ = ctx.wait( async move { let s = r.send(item); s.await From a87de3c6792a5780c737fba0e4df84b8094be5df Mon Sep 17 00:00:00 2001 From: mfranciszkiewicz Date: Fri, 15 Jan 2021 15:16:29 +0100 Subject: [PATCH 2/2] increase mailbox capacity of Connection from 16 to 256 --- service-bus/bus/src/connection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/service-bus/bus/src/connection.rs b/service-bus/bus/src/connection.rs index bc1d201e93..9c9f1e8cec 100644 --- a/service-bus/bus/src/connection.rs +++ b/service-bus/bus/src/connection.rs @@ -401,7 +401,8 @@ where { type Context = Context; - fn started(&mut self, _ctx: &mut Self::Context) { + fn started(&mut self, ctx: &mut Self::Context) { + ctx.set_mailbox_capacity(256); log::info!("started connection to gsb"); }