Skip to content

Commit

Permalink
Inject logs into module on failure to invoke reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
kazimuth committed Jul 24, 2023
1 parent 92743d2 commit 9ad9ae7
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 6 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl DecodedMessage<'_> {

/// An error that arises from
#[derive(thiserror::Error, Debug)]
#[error("error executing message (reducer: {reducer:?})")]
#[error("error executing message (reducer: {reducer:?}) (err: {err:?})")]
pub struct MessageExecutionError {
pub reducer: Option<String>,
pub caller_identity: Identity,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/database_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct DatabaseLogger {
pub tx: broadcast::Sender<bytes::Bytes>,
}

#[derive(Clone, Copy, PartialEq, Eq)]
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum LogLevel {
Error,
Warn,
Expand Down
41 changes: 38 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::client::ClientConnectionSender;
use crate::database_logger::LogLevel;
use crate::db::datastore::traits::{TableId, TxData, TxOp};
use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
Expand Down Expand Up @@ -215,6 +216,11 @@ enum ModuleHostCommand {
StopTrace {
respond_to: oneshot::Sender<anyhow::Result<()>>,
},
InjectLogs {
respond_to: oneshot::Sender<()>,
log_level: LogLevel,
message: String,
},
}

impl ModuleHostCommand {
Expand Down Expand Up @@ -242,6 +248,7 @@ impl ModuleHostCommand {
ModuleHostCommand::StopTrace { respond_to } => {
let _ = respond_to.send(actor.stop_trace());
}
ModuleHostCommand::InjectLogs { respond_to, log_level, message } => actor.inject_logs(respond_to, log_level, message),
}
}
}
Expand Down Expand Up @@ -280,6 +287,7 @@ pub trait ModuleHostActor: Send + 'static {
fn get_trace(&self) -> Option<bytes::Bytes>;
#[cfg(feature = "tracelogging")]
fn stop_trace(&mut self) -> Result<(), anyhow::Error>;
fn inject_logs(&self, respond_to: oneshot::Sender<()>, log_level: LogLevel, message: String);
fn close(self);
}

Expand Down Expand Up @@ -409,12 +417,34 @@ impl ModuleHost {
reducer_name: &str,
args: ReducerArgs,
) -> Result<ReducerCallResult, ReducerCallError> {
let (reducer_id, _, schema) = self
let found_reducer = self
.info
.reducers
.get_full(reducer_name)
.ok_or(ReducerCallError::NoSuchReducer)?;
let args = args.into_tuple(self.info.typespace.with_type(schema))?;
.ok_or(ReducerCallError::NoSuchReducer);
let (reducer_id, _, schema) = match found_reducer {
Ok(ok) => ok,
Err(err) => {
let _ = self.inject_logs(LogLevel::Error, format!(
"External attempt to call nonexistent reducer \"{}\" failed. Have you run `spacetime generate` recently?",
reducer_name
)).await;
Err(err)?
}
};

let args = args.into_tuple(self.info.typespace.with_type(schema));
let args = match args {
Ok(ok) => ok,
Err(err) => {
let _ = self.inject_logs(LogLevel::Error, format!(
"External attempt to call reducer \"{}\" failed, invalid arguments.\nThis is likely due to a mismatched client schema, have you run `spacetime generate` recently?",
reducer_name,
)).await;
Err(err)?
}
};

self.call(|respond_to| ModuleHostCommand::CallReducer {
caller_identity,
client,
Expand Down Expand Up @@ -472,6 +502,11 @@ impl ModuleHost {
.await?
}

pub async fn inject_logs(&self, log_level: LogLevel, message: String) -> Result<(), NoSuchModule> {
self.call(|respond_to| ModuleHostCommand::InjectLogs { respond_to, log_level, message })
.await
}

pub fn downgrade(&self) -> WeakModuleHost {
WeakModuleHost {
info: self.info.clone(),
Expand Down
23 changes: 22 additions & 1 deletion crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::sync::oneshot;

use crate::client::ClientConnectionSender;
use crate::database_instance_context::DatabaseInstanceContext;
use crate::database_logger::{DatabaseLogger, Record};
use crate::database_logger::{DatabaseLogger, LogLevel, Record};
use crate::hash::Hash;
use crate::host::instance_env::InstanceEnv;
use crate::host::module_host::{
Expand Down Expand Up @@ -407,6 +407,10 @@ impl<T: WasmModule> ModuleHostActor for WasmModuleHostActor<T> {
})
}

fn inject_logs(&self, respond_to: oneshot::Sender<()>, log_level: LogLevel, message: String) {
self.instances.send(InstanceMessage::InjectLogs { respond_to, log_level, message })
}

fn close(self) {
self.instances.seed().scheduler.close();
self.instances.join()
Expand Down Expand Up @@ -484,6 +488,18 @@ impl<T: WasmInstance> JobRunner for WasmInstanceActor<T> {
InstanceMessage::UpdateDatabase { respond_to } => {
let _ = respond_to.send(self.update_database());
}
InstanceMessage::InjectLogs { respond_to, log_level, message } => {
let _ = respond_to.send(self.instance.instance_env().console_log(
log_level,
&Record {
target: None,
filename: Some("external"),
line_number: None,
message: &message,
},
&(),
));
},
}
if self.trapped {
ControlFlow::Break(())
Expand Down Expand Up @@ -930,4 +946,9 @@ enum InstanceMessage {
UpdateDatabase {
respond_to: oneshot::Sender<Result<UpdateDatabaseResult, anyhow::Error>>,
},
InjectLogs {
respond_to: oneshot::Sender<()>,
log_level: LogLevel,
message: String,
},
}

0 comments on commit 9ad9ae7

Please sign in to comment.