From 046656fd4844f7f9ae66b4c7d55cbb47fedf4601 Mon Sep 17 00:00:00 2001 From: Afsal Thaj Date: Mon, 29 Jan 2024 10:14:19 +1100 Subject: [PATCH 1/3] Allow websocket connection closure to emit errors, and fix confusing message --- src/clients/worker.rs | 113 +++++++++++++++++++++++------------------- src/worker.rs | 7 ++- 2 files changed, 68 insertions(+), 52 deletions(-) diff --git a/src/clients/worker.rs b/src/clients/worker.rs index 50eaae3..04958c5 100644 --- a/src/clients/worker.rs +++ b/src/clients/worker.rs @@ -306,58 +306,71 @@ impl WorkerClient for WorkerCl }); let read_res = read.for_each(|message| async { - let message: Message = message.unwrap(); - - let msg = match message { - Message::Text(str) => { - let parsed: serde_json::Result = - serde_json::from_str(&str); - Some(parsed.unwrap()) // TODO: error handling - } - Message::Binary(data) => { - let parsed: serde_json::Result = - serde_json::from_slice(&data); - Some(parsed.unwrap()) // TODO: error handling - } - Message::Ping(_) => { - debug!("Ignore ping"); - None - } - Message::Pong(_) => { - debug!("Ignore pong"); - None - } - Message::Close(_) => { - info!("Ignore unexpected close"); - None + match message { + Err(e) => { + print!("Error reading message: {}", e); + return; } - Message::Frame(_) => { - info!("Ignore unexpected frame"); - None - } - }; - - match msg { - None => {} - Some(msg) => match msg.event { - WorkerEvent::Stdout(StdOutLog { message }) => { - print!("{message}") - } - WorkerEvent::Stderr(StdErrLog { message }) => { - print!("{message}") + Ok(msgg) => { + let msg = match msgg { + Message::Text(str) => { + let parsed: serde_json::Result = + serde_json::from_str(&str); + Some(parsed.unwrap()) // TODO: error handling + } + Message::Binary(data) => { + let parsed: serde_json::Result = + serde_json::from_slice(&data); + Some(parsed.unwrap()) // TODO: error handling + } + Message::Ping(_) => { + debug!("Ignore ping"); + None + } + Message::Pong(_) => { + debug!("Ignore pong"); + None + } + Message::Close(details) => { + match details { + Some(closed_frame) => { + print!("Connection Closed: {}", closed_frame); + } + None => { + print!("Connection Closed"); + } + } + None + } + Message::Frame(_) => { + info!("Ignore unexpected frame"); + None + } + }; + + match msg { + None => {} + Some(msg) => match msg.event { + WorkerEvent::Stdout(StdOutLog { message }) => { + print!("{message}") + } + WorkerEvent::Stderr(StdErrLog { message }) => { + print!("{message}") + } + WorkerEvent::Log(Log { + level, + context, + message, + }) => match level { + 0 => tracing::trace!(message, context = context), + 1 => tracing::debug!(message, context = context), + 2 => tracing::info!(message, context = context), + 3 => tracing::warn!(message, context = context), + _ => tracing::error!(message, context = context), + }, + }, } - WorkerEvent::Log(Log { - level, - context, - message, - }) => match level { - 0 => tracing::trace!(message, context = context), - 1 => tracing::debug!(message, context = context), - 2 => tracing::info!(message, context = context), - 3 => tracing::warn!(message, context = context), - _ => tracing::error!(message, context = context), - }, - }, + } } }); diff --git a/src/worker.rs b/src/worker.rs index 7313ecc..7411ba4 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -213,9 +213,12 @@ impl<'r, C: WorkerClient + Send + Sync, R: TemplateHandler + Send + Sync> Worker } => { let template_id = self.templates.resolve_id(template_id_or_name).await?; - self.client.connect(worker_name, template_id).await?; + let result = self.client.connect(worker_name, template_id).await; - Err(GolemError("connect should never complete".to_string())) + match result { + Ok(_) => Err(GolemError("Unexpected connection closure".to_string())), + Err(err) => Err(GolemError(err.to_string())), + } } WorkerSubcommand::Interrupt { template_id_or_name, From a34226b719d3f0b76d34258c70323e853e03e1f5 Mon Sep 17 00:00:00 2001 From: Afsal Thaj Date: Mon, 29 Jan 2024 12:06:57 +1100 Subject: [PATCH 2/3] Remove return --- src/clients/worker.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/clients/worker.rs b/src/clients/worker.rs index 04958c5..db0c8fc 100644 --- a/src/clients/worker.rs +++ b/src/clients/worker.rs @@ -309,7 +309,6 @@ impl WorkerClient for WorkerCl match message { Err(e) => { print!("Error reading message: {}", e); - return; } Ok(msgg) => { let msg = match msgg { From 03dbde9904d0b524274b5a9dfde9a7398cf3df77 Mon Sep 17 00:00:00 2001 From: Afsal Thaj Date: Mon, 29 Jan 2024 12:10:39 +1100 Subject: [PATCH 3/3] Clean up namings --- src/clients/worker.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/clients/worker.rs b/src/clients/worker.rs index db0c8fc..80b5bdf 100644 --- a/src/clients/worker.rs +++ b/src/clients/worker.rs @@ -305,13 +305,13 @@ impl WorkerClient for WorkerCl } }); - let read_res = read.for_each(|message| async { - match message { - Err(e) => { - print!("Error reading message: {}", e); + let read_res = read.for_each(|message_or_error| async { + match message_or_error { + Err(error) => { + print!("Error reading message: {}", error); } - Ok(msgg) => { - let msg = match msgg { + Ok(message) => { + let instance_connect_msg = match message { Message::Text(str) => { let parsed: serde_json::Result = serde_json::from_str(&str); @@ -347,7 +347,7 @@ impl WorkerClient for WorkerCl } }; - match msg { + match instance_connect_msg { None => {} Some(msg) => match msg.event { WorkerEvent::Stdout(StdOutLog { message }) => {