diff --git a/src/clients/worker.rs b/src/clients/worker.rs index 50eaae3..80b5bdf 100644 --- a/src/clients/worker.rs +++ b/src/clients/worker.rs @@ -305,59 +305,71 @@ impl WorkerClient for WorkerCl } }); - let read_res = read.for_each(|message| async { - let message: Message = message.unwrap(); - - let msg = match message { - Message::Text(str) => { - let parsed: serde_json::Result = - serde_json::from_str(&str); - Some(parsed.unwrap()) // TODO: error handling + let read_res = read.for_each(|message_or_error| async { + match message_or_error { + Err(error) => { + print!("Error reading message: {}", error); } - Message::Binary(data) => { - let parsed: serde_json::Result = - serde_json::from_slice(&data); - Some(parsed.unwrap()) // TODO: error handling - } - Message::Ping(_) => { - debug!("Ignore ping"); - None - } - Message::Pong(_) => { - debug!("Ignore pong"); - None - } - Message::Close(_) => { - info!("Ignore unexpected close"); - None - } - Message::Frame(_) => { - info!("Ignore unexpected frame"); - None - } - }; - - match msg { - None => {} - Some(msg) => match msg.event { - WorkerEvent::Stdout(StdOutLog { message }) => { - print!("{message}") - } - WorkerEvent::Stderr(StdErrLog { message }) => { - print!("{message}") + Ok(message) => { + let instance_connect_msg = match message { + Message::Text(str) => { + let parsed: serde_json::Result = + serde_json::from_str(&str); + Some(parsed.unwrap()) // TODO: error handling + } + Message::Binary(data) => { + let parsed: serde_json::Result = + serde_json::from_slice(&data); + Some(parsed.unwrap()) // TODO: error handling + } + Message::Ping(_) => { + debug!("Ignore ping"); + None + } + Message::Pong(_) => { + debug!("Ignore pong"); + None + } + Message::Close(details) => { + match details { + Some(closed_frame) => { + print!("Connection Closed: {}", closed_frame); + } + None => { + print!("Connection Closed"); + } + } + None + } + Message::Frame(_) => { + info!("Ignore unexpected frame"); + None + } + }; + + match instance_connect_msg { + None => {} + Some(msg) => match msg.event { + WorkerEvent::Stdout(StdOutLog { message }) => { + print!("{message}") + } + WorkerEvent::Stderr(StdErrLog { message }) => { + print!("{message}") + } + WorkerEvent::Log(Log { + level, + context, + message, + }) => match level { + 0 => tracing::trace!(message, context = context), + 1 => tracing::debug!(message, context = context), + 2 => tracing::info!(message, context = context), + 3 => tracing::warn!(message, context = context), + _ => tracing::error!(message, context = context), + }, + }, } - WorkerEvent::Log(Log { - level, - context, - message, - }) => match level { - 0 => tracing::trace!(message, context = context), - 1 => tracing::debug!(message, context = context), - 2 => tracing::info!(message, context = context), - 3 => tracing::warn!(message, context = context), - _ => tracing::error!(message, context = context), - }, - }, + } } }); diff --git a/src/worker.rs b/src/worker.rs index 7313ecc..7411ba4 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -213,9 +213,12 @@ impl<'r, C: WorkerClient + Send + Sync, R: TemplateHandler + Send + Sync> Worker } => { let template_id = self.templates.resolve_id(template_id_or_name).await?; - self.client.connect(worker_name, template_id).await?; + let result = self.client.connect(worker_name, template_id).await; - Err(GolemError("connect should never complete".to_string())) + match result { + Ok(_) => Err(GolemError("Unexpected connection closure".to_string())), + Err(err) => Err(GolemError(err.to_string())), + } } WorkerSubcommand::Interrupt { template_id_or_name,