Skip to content

Commit

Permalink
Re-instantiate propagating the result of updating a db back to the user
Browse files Browse the repository at this point in the history
This cannot be satisfied in legacy cloud, but potentially it can in the
next iteration of it.
  • Loading branch information
kim committed Jul 10, 2023
1 parent 2b85cc8 commit 74bfbab
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 50 deletions.
25 changes: 22 additions & 3 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use spacetimedb::address::Address;
use spacetimedb::auth::identity::{DecodingKey, EncodingKey};
use spacetimedb::client::ClientActorIndex;
use spacetimedb::database_instance_context_controller::DatabaseInstanceContextController;
use spacetimedb::host::HostController;
use spacetimedb::host::{HostController, UpdateDatabaseResult};
use spacetimedb::identity::Identity;
use spacetimedb::messages::control_db::{Database, DatabaseInstance, EnergyBalance, IdentityEmail, Node};
use spacetimedb::messages::worker_db::DatabaseInstanceState;
Expand Down Expand Up @@ -122,7 +122,21 @@ pub trait ControlStateReadAccess {
pub trait ControlStateWriteAccess: Send + Sync {
// Databases
async fn create_address(&self) -> spacetimedb::control_db::Result<Address>;
async fn publish_database(&self, identity: &Identity, spec: DatabaseDef) -> spacetimedb::control_db::Result<()>;

/// Publish a database acc. to [`DatabaseDef`].
///
/// If the database with the given address was successfully published before,
/// it is updated acc. to the module lifecycle conventions. `Some` result is
/// returned in that case.
///
/// Otherwise, `None` is returned meaning that the database was freshly
/// initialized.
async fn publish_database(
&self,
identity: &Identity,
spec: DatabaseDef,
) -> spacetimedb::control_db::Result<Option<UpdateDatabaseResult>>;

async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()>;

// Identities
Expand Down Expand Up @@ -224,9 +238,14 @@ impl<T: ControlStateWriteAccess + ?Sized> ControlStateWriteAccess for ArcEnv<T>
self.0.create_address().await
}

async fn publish_database(&self, identity: &Identity, spec: DatabaseDef) -> spacetimedb::control_db::Result<()> {
async fn publish_database(
&self,
identity: &Identity,
spec: DatabaseDef,
) -> spacetimedb::control_db::Result<Option<UpdateDatabaseResult>> {
self.0.publish_database(identity, spec).await
}

async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()> {
self.0.delete_database(identity, address).await
}
Expand Down
47 changes: 34 additions & 13 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use spacetimedb::host::EntityDef;
use spacetimedb::host::ReducerArgs;
use spacetimedb::host::ReducerCallError;
use spacetimedb::host::ReducerOutcome;
use spacetimedb::host::UpdateDatabaseSuccess;
use spacetimedb_lib::name;
use spacetimedb_lib::name::DomainName;
use spacetimedb_lib::name::DomainParsingError;
Expand Down Expand Up @@ -581,7 +582,6 @@ pub async fn dns<S: ControlStateDelegate>(
Query(DNSQueryParams {}): Query<DNSQueryParams>,
) -> axum::response::Result<impl IntoResponse> {
let domain = database_name.parse().map_err(DomainParsingRejection)?;
log::debug!("dns with `{domain}`");
let address = ctx.lookup_address(&domain).map_err(log_and_500)?;
let response = if let Some(address) = address {
DnsLookupResponse::Success {
Expand Down Expand Up @@ -804,20 +804,41 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
}
};

ctx.publish_database(
&auth.identity,
DatabaseDef {
address: db_addr,
program_bytes: body.into(),
num_replicas: 1,
trace_log: should_trace(trace_log),
},
)
.await
.map_err(log_and_500)?;
let maybe_updated = ctx
.publish_database(
&auth.identity,
DatabaseDef {
address: db_addr,
program_bytes: body.into(),
num_replicas: 1,
trace_log: should_trace(trace_log),
},
)
.await
.map_err(log_and_500)?;

if let Some(updated) = maybe_updated {
match updated {
Ok(success) => {
if let UpdateDatabaseSuccess {
// An update reducer was defined, and it was run
update_result: Some(update_result),
// Not yet implemented
migrate_results: _,
} = success
{
let ror = reducer_outcome_response(&auth.identity, "update", update_result.outcome);
if !matches!(ror, (StatusCode::OK, _)) {
return Err(ror.into());
}
}
}
Err(e) => return Err((StatusCode::BAD_REQUEST, format!("Database update rejected: {e}")).into()),
}
}

Ok(axum::Json(PublishResult::Success {
domain: db_name.map(|domain| domain.to_string()),
domain: db_name.as_ref().map(ToString::to_string),
address: db_addr.to_hex(),
op,
}))
Expand Down
60 changes: 27 additions & 33 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use spacetimedb::sendgrid_controller::SendGridController;
use spacetimedb::{stdb_path, worker_metrics};
use spacetimedb_lib::name::{DomainName, InsertDomainResult, RegisterTldResult, Tld};
use spacetimedb_lib::recovery::RecoveryCode;
use spacetimedb_lib::Hash;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -271,19 +270,27 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
&self,
identity: &Identity,
spec: spacetimedb_client_api::DatabaseDef,
) -> spacetimedb::control_db::Result<()> {
) -> spacetimedb::control_db::Result<Option<UpdateDatabaseResult>> {
let existing_db = self.control_db.get_database_by_address(&spec.address)?;
let mut database = existing_db.clone().unwrap_or(Database {
id: 0,
address: spec.address,
identity: *identity,
host_type: HostType::Wasmer,
num_replicas: spec.num_replicas,
program_bytes_address: Hash::ZERO,
trace_log: spec.trace_log,
});
let addr = self.object_db.insert_object(spec.program_bytes)?;
database.program_bytes_address = addr;
let program_bytes_address = self.object_db.insert_object(spec.program_bytes)?;
let mut database = match existing_db.as_ref() {
Some(existing) => Database {
address: spec.address,
num_replicas: spec.num_replicas,
program_bytes_address,
trace_log: spec.trace_log,
..existing.clone()
},
None => Database {
id: 0,
address: spec.address,
identity: *identity,
host_type: HostType::Wasmer,
num_replicas: spec.num_replicas,
program_bytes_address,
trace_log: spec.trace_log,
},
};

if let Some(existing) = existing_db.as_ref() {
if &existing.identity != identity {
Expand All @@ -306,11 +313,14 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
self.schedule_database(Some(database), existing_db).await?;

if should_update_instances {
// TODO(kim): cannot return result in cloud nor here
let _ = self.update_database_instances(database_id).await?;
let leader = self
.control_db
.get_leader_database_instance_by_database(database_id)
.ok_or_else(|| anyhow!("Not found: leader instance for database {database_id}"))?;
Ok(self.update_database_instance(leader).await?)
} else {
Ok(None)
}

Ok(())
}

async fn delete_database(&self, identity: &Identity, address: &Address) -> spacetimedb::control_db::Result<()> {
Expand Down Expand Up @@ -452,22 +462,6 @@ impl StandaloneEnv {
Ok(())
}

// TODO(kim): update should only run on the leader instance, and this
// method should return a single result
async fn update_database_instances(
&self,
database_id: u64,
) -> Result<Vec<Option<UpdateDatabaseResult>>, anyhow::Error> {
let instances = self.control_db.get_database_instances_by_database(database_id)?;
let mut results = Vec::with_capacity(instances.len());
for instance in instances {
let res = self.update_database_instance(instance).await?;
results.push(res);
}

Ok(results)
}

async fn deschedule_replicas(&self, database_id: u64, num_replicas: u32) -> Result<(), anyhow::Error> {
for _ in 0..num_replicas {
let instances = self.control_db.get_database_instances_by_database(database_id)?;
Expand Down
2 changes: 1 addition & 1 deletion test/tests/update-module.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Person {
#[spacetimedb(reducer)]
pub fn add(name: String) {
Person::insert(Person { id: 0, name });
let _ = Person::insert(Person { id: 0, name }).unwrap();
}
#[spacetimedb(reducer)]
Expand Down

0 comments on commit 74bfbab

Please sign in to comment.