Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud related fixes #218

Merged
merged 17 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,13 @@ pub async fn info(
State(worker_ctx): State<Arc<dyn WorkerCtx>>,
Path(InfoParams { name_or_address }): Path<InfoParams>,
) -> axum::response::Result<impl IntoResponse> {
log::trace!("Trying to resolve address: {:?}", name_or_address);
let address = name_or_address.resolve(&*worker_ctx).await?.into();
log::trace!("Resolved address to: {address:?}");
let database = worker_ctx_find_database(&*worker_ctx, &address)
.await?
.ok_or((StatusCode::NOT_FOUND, "No such database."))?;
log::trace!("Fetched database from the worker db for address: {address:?}");

let host_type = match database.host_type {
HostType::Wasmer => "wasmer",
Expand Down
4 changes: 2 additions & 2 deletions crates/client-api/src/routes/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use spacetimedb::auth::identity::encode_token_with_expiry;
use spacetimedb_lib::de::serde::DeserializeWrapper;
use spacetimedb_lib::Identity;

use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader};
pub use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader};
use crate::{log_and_500, ControlCtx, ControlNodeDelegate};

#[derive(Deserialize)]
Expand Down Expand Up @@ -175,7 +175,7 @@ pub async fn get_databases(

#[derive(Debug, Serialize)]
pub struct WebsocketTokenResponse {
token: String,
pub token: String,
}

pub async fn create_websocket_token(
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl headers::Header for XForwardedFor {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum NameOrAddress {
Address(Address),
Name(String),
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::fmt;
use std::ops::Sub;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

use super::module_host::{
Catalog, EntityDef, EventStatus, ModuleHost, ModuleStarter, NoSuchModule, UpdateDatabaseResult,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl HostController {
let key = module_host_context.dbic.database_instance_id;

let (module_host, start_module, start_scheduler) =
tokio::task::block_in_place(|| Self::make_module_host(module_host_context, self.energy_monitor.clone()))?;
Self::make_module_host(module_host_context, self.energy_monitor.clone())?;

let old_module = self.modules.lock().unwrap().insert(key, module_host.clone());
if let Some(old_module) = old_module {
Expand All @@ -235,13 +235,15 @@ impl HostController {
) -> anyhow::Result<(ModuleHost, ModuleStarter, SchedulerStarter)> {
let module_hash = hash_bytes(&mhc.program_bytes);
let (module_host, module_starter) = match mhc.host_type {
HostType::Wasmer => ModuleHost::spawn(wasmer::make_actor(
mhc.dbic,
module_hash,
&mhc.program_bytes,
mhc.scheduler,
energy_monitor,
)?),
HostType::Wasmer => {
// make_actor with block_in_place since it's going to take some time to compute.
let start = Instant::now();
let actor = tokio::task::block_in_place(|| {
wasmer::make_actor(mhc.dbic, module_hash, &mhc.program_bytes, mhc.scheduler, energy_monitor)
})?;
log::trace!("wasmer::make_actor blocked for {}us", start.elapsed().as_micros());
ModuleHost::spawn(actor)
}
};
Ok((module_host, module_starter, mhc.scheduler_starter))
}
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,15 @@ impl ModuleHost {
let (tx, rx) = mpsc::channel(8);
let (start_tx, start_rx) = oneshot::channel();
let info = actor.info();
tokio::task::spawn_blocking(|| {
let _ = start_rx.blocking_recv();
Self::run_actor(rx, actor)
tokio::spawn(async move {
let _ = start_rx.await;
Self::run_actor(rx, actor).await
});
(ModuleHost { info, tx }, ModuleStarter { tx: start_tx })
}

fn run_actor(mut rx: mpsc::Receiver<CmdOrExit>, mut actor: impl ModuleHostActor) {
while let Some(command) = rx.blocking_recv() {
async fn run_actor(mut rx: mpsc::Receiver<CmdOrExit>, mut actor: impl ModuleHostActor) {
while let Some(command) = rx.recv().await {
match command {
CmdOrExit::Cmd(command) => command.dispatch(&mut actor),
CmdOrExit::Exit => rx.close(),
Expand Down
19 changes: 16 additions & 3 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
scheduler: Scheduler,
energy_monitor: Arc<dyn EnergyMonitor>,
) -> Result<Self, InitializationError> {
log::trace!(
"Making new module host actor for database {}",
database_instance_context.address
);
let log_tx = database_instance_context.logger.lock().unwrap().tx.clone();

FuncNames::check_required(|name| module.get_export(name))?;
Expand Down Expand Up @@ -208,6 +212,10 @@ impl<T: WasmInstancePre> JobRunnerSeed for InstanceSeed<T> {
type Runner = WasmInstanceActor<T::Instance>;
type Job = InstanceMessage;
fn make_runner(&self) -> Self::Runner {
log::trace!(
"Making new runner for database {}",
self.worker_database_instance.address
);
let env = InstanceEnv::new(self.worker_database_instance.clone(), self.scheduler.clone());
// this shouldn't fail, since we already called module.create_instance()
// before and it didn't error, and ideally they should be deterministic
Expand Down Expand Up @@ -289,6 +297,7 @@ impl<S: JobRunnerSeed> JobPool<S> {
}
});
}

fn spawn(&self) {
self.spawn_from_runner(self.seed().make_runner())
}
Expand Down Expand Up @@ -490,10 +499,14 @@ impl<T: WasmInstance> WasmInstanceActor<T> {
stdb.with_auto_commit::<_, _, anyhow::Error>(|tx| {
for table in self.info.catalog.values().filter_map(EntityDef::as_table) {
let schema = self.schema_for(table)?;
stdb.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", table.name))?;
let result = stdb
.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", table.name));
if let Err(err) = result {
log::error!("{:?}", err);
return Err(err);
}
}

Ok(())
})?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/messages/worker_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use spacetimedb_sats::de::Deserialize;
use spacetimedb_sats::ser::Serialize;

#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DatabaseInstanceState {
pub database_instance_id: u64,
pub initialized: bool,
Expand Down
8 changes: 7 additions & 1 deletion crates/lib/src/address.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::Ipv6Addr;
use std::{fmt::Display, net::Ipv6Addr};

use anyhow::Context as _;
use hex::FromHex as _;
Expand All @@ -15,6 +15,12 @@ use crate::sats;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Address(u128);

impl Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_hex())
}
}

impl Address {
const ABBREVIATION_LEN: usize = 16;

Expand Down
2 changes: 1 addition & 1 deletion crates/sdk/examples/cursive-chat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn register_callbacks(send: UiSend) {
on_set_name(on_name_set(send.clone()));

// When we fail to send a message, print a warning.
on_send_message(on_message_sent(send.clone()));
on_send_message(on_message_sent(send));
}

// ## Save credentials to a file
Expand Down
4 changes: 2 additions & 2 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,10 @@ impl StandaloneEnv {
database_instance_id: instance.id,
initialized: false,
};
state.initialized = true;
self.worker_db.upsert_database_instance_state(state.clone()).unwrap();
self.init_module_on_database_instance(instance.database_id, instance.id)
.await?;
self.worker_db.upsert_database_instance_state(state.clone()).unwrap();
state.initialized = true;
self.worker_db.upsert_database_instance_state(state).unwrap();
Ok(())
}
Expand Down
Loading