Skip to content

Commit

Permalink
Cloud related fixes (#218)
Browse files Browse the repository at this point in the history
* Remove unused datastore traits

We previously anticipated factoring the database in such a way that
the typed vs untyped distinctions, and the transactional vs not,
semantics would be more visible. We have not used these distinctions
and no longer need to expose these traits.

* Fix recovery of sequences after restart

* Debug

* More debug logs

* Export more tyeps

* More debug info and small changes

* More logs

* Change waiting on messages from client to be async

* Remove some trace lines

* Revert "Fix recovery of sequences after restart"

This reverts commit eef0089.

* Remove more trace lines

* cargo fmt

* clippy

---------

Signed-off-by: Tyler Cloutier <cloutiertyler@users.noreply.github.com>
Co-authored-by: George Kulakowski <george@clockworklabs.io>
Co-authored-by: Mario Alejandro Montoya Cortés <mamcx@elmalabarista.com>
Co-authored-by: Tyler Cloutier <cloutiertyler@aol.com>
Co-authored-by: Tyler Cloutier <cloutiertyler@users.noreply.github.com>
  • Loading branch information
5 people authored Aug 24, 2023
1 parent e72b8f3 commit f1406e4
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 25 deletions.
3 changes: 3 additions & 0 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,13 @@ pub async fn info(
State(worker_ctx): State<Arc<dyn WorkerCtx>>,
Path(InfoParams { name_or_address }): Path<InfoParams>,
) -> axum::response::Result<impl IntoResponse> {
log::trace!("Trying to resolve address: {:?}", name_or_address);
let address = name_or_address.resolve(&*worker_ctx).await?.into();
log::trace!("Resolved address to: {address:?}");
let database = worker_ctx_find_database(&*worker_ctx, &address)
.await?
.ok_or((StatusCode::NOT_FOUND, "No such database."))?;
log::trace!("Fetched database from the worker db for address: {address:?}");

let host_type = match database.host_type {
HostType::Wasmer => "wasmer",
Expand Down
4 changes: 2 additions & 2 deletions crates/client-api/src/routes/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use spacetimedb::auth::identity::encode_token_with_expiry;
use spacetimedb_lib::de::serde::DeserializeWrapper;
use spacetimedb_lib::Identity;

use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader};
pub use crate::auth::{SpacetimeAuth, SpacetimeAuthHeader};
use crate::{log_and_500, ControlCtx, ControlNodeDelegate};

#[derive(Deserialize)]
Expand Down Expand Up @@ -175,7 +175,7 @@ pub async fn get_databases(

#[derive(Debug, Serialize)]
pub struct WebsocketTokenResponse {
token: String,
pub token: String,
}

pub async fn create_websocket_token(
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl headers::Header for XForwardedFor {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum NameOrAddress {
Address(Address),
Name(String),
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::fmt;
use std::ops::Sub;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

use super::module_host::{
Catalog, EntityDef, EventStatus, ModuleHost, ModuleStarter, NoSuchModule, UpdateDatabaseResult,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl HostController {
let key = module_host_context.dbic.database_instance_id;

let (module_host, start_module, start_scheduler) =
tokio::task::block_in_place(|| Self::make_module_host(module_host_context, self.energy_monitor.clone()))?;
Self::make_module_host(module_host_context, self.energy_monitor.clone())?;

let old_module = self.modules.lock().unwrap().insert(key, module_host.clone());
if let Some(old_module) = old_module {
Expand All @@ -235,13 +235,15 @@ impl HostController {
) -> anyhow::Result<(ModuleHost, ModuleStarter, SchedulerStarter)> {
let module_hash = hash_bytes(&mhc.program_bytes);
let (module_host, module_starter) = match mhc.host_type {
HostType::Wasmer => ModuleHost::spawn(wasmer::make_actor(
mhc.dbic,
module_hash,
&mhc.program_bytes,
mhc.scheduler,
energy_monitor,
)?),
HostType::Wasmer => {
// make_actor with block_in_place since it's going to take some time to compute.
let start = Instant::now();
let actor = tokio::task::block_in_place(|| {
wasmer::make_actor(mhc.dbic, module_hash, &mhc.program_bytes, mhc.scheduler, energy_monitor)
})?;
log::trace!("wasmer::make_actor blocked for {}us", start.elapsed().as_micros());
ModuleHost::spawn(actor)
}
};
Ok((module_host, module_starter, mhc.scheduler_starter))
}
Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,15 @@ impl ModuleHost {
let (tx, rx) = mpsc::channel(8);
let (start_tx, start_rx) = oneshot::channel();
let info = actor.info();
tokio::task::spawn_blocking(|| {
let _ = start_rx.blocking_recv();
Self::run_actor(rx, actor)
tokio::spawn(async move {
let _ = start_rx.await;
Self::run_actor(rx, actor).await
});
(ModuleHost { info, tx }, ModuleStarter { tx: start_tx })
}

fn run_actor(mut rx: mpsc::Receiver<CmdOrExit>, mut actor: impl ModuleHostActor) {
while let Some(command) = rx.blocking_recv() {
async fn run_actor(mut rx: mpsc::Receiver<CmdOrExit>, mut actor: impl ModuleHostActor) {
while let Some(command) = rx.recv().await {
match command {
CmdOrExit::Cmd(command) => command.dispatch(&mut actor),
CmdOrExit::Exit => rx.close(),
Expand Down
19 changes: 16 additions & 3 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ impl<T: WasmModule> WasmModuleHostActor<T> {
scheduler: Scheduler,
energy_monitor: Arc<dyn EnergyMonitor>,
) -> Result<Self, InitializationError> {
log::trace!(
"Making new module host actor for database {}",
database_instance_context.address
);
let log_tx = database_instance_context.logger.lock().unwrap().tx.clone();

FuncNames::check_required(|name| module.get_export(name))?;
Expand Down Expand Up @@ -208,6 +212,10 @@ impl<T: WasmInstancePre> JobRunnerSeed for InstanceSeed<T> {
type Runner = WasmInstanceActor<T::Instance>;
type Job = InstanceMessage;
fn make_runner(&self) -> Self::Runner {
log::trace!(
"Making new runner for database {}",
self.worker_database_instance.address
);
let env = InstanceEnv::new(self.worker_database_instance.clone(), self.scheduler.clone());
// this shouldn't fail, since we already called module.create_instance()
// before and it didn't error, and ideally they should be deterministic
Expand Down Expand Up @@ -289,6 +297,7 @@ impl<S: JobRunnerSeed> JobPool<S> {
}
});
}

fn spawn(&self) {
self.spawn_from_runner(self.seed().make_runner())
}
Expand Down Expand Up @@ -490,10 +499,14 @@ impl<T: WasmInstance> WasmInstanceActor<T> {
stdb.with_auto_commit::<_, _, anyhow::Error>(|tx| {
for table in self.info.catalog.values().filter_map(EntityDef::as_table) {
let schema = self.schema_for(table)?;
stdb.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", table.name))?;
let result = stdb
.create_table(tx, schema)
.with_context(|| format!("failed to create table {}", table.name));
if let Err(err) = result {
log::error!("{:?}", err);
return Err(err);
}
}

Ok(())
})?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/messages/worker_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use spacetimedb_sats::de::Deserialize;
use spacetimedb_sats::ser::Serialize;

#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DatabaseInstanceState {
pub database_instance_id: u64,
pub initialized: bool,
Expand Down
8 changes: 7 additions & 1 deletion crates/lib/src/address.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::Ipv6Addr;
use std::{fmt::Display, net::Ipv6Addr};

use anyhow::Context as _;
use hex::FromHex as _;
Expand All @@ -15,6 +15,12 @@ use crate::sats;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Address(u128);

impl Display for Address {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.to_hex())
}
}

impl Address {
const ABBREVIATION_LEN: usize = 16;

Expand Down
2 changes: 1 addition & 1 deletion crates/sdk/examples/cursive-chat/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn register_callbacks(send: UiSend) {
on_set_name(on_name_set(send.clone()));

// When we fail to send a message, print a warning.
on_send_message(on_message_sent(send.clone()));
on_send_message(on_message_sent(send));
}

// ## Save credentials to a file
Expand Down
4 changes: 2 additions & 2 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ impl StandaloneEnv {
database_instance_id: instance.id,
initialized: false,
};
state.initialized = true;
self.worker_db.upsert_database_instance_state(state.clone()).unwrap();
self.init_module_on_database_instance(instance.database_id, instance.id)
.await?;
self.worker_db.upsert_database_instance_state(state.clone()).unwrap();
state.initialized = true;
self.worker_db.upsert_database_instance_state(state).unwrap();
Ok(())
}
Expand Down

0 comments on commit f1406e4

Please sign in to comment.