Skip to content

Commit

Permalink
One-off query support in SpacetimeDB core.
Browse files Browse the repository at this point in the history
  • Loading branch information
kazimuth committed Sep 5, 2023
1 parent a1e9984 commit ad6e4ae
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 23 deletions.
34 changes: 34 additions & 0 deletions crates/client-api-messages/protobuf/client_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ message Message {
IdentityToken identityToken = 5;
// client -> database, register SQL queries on which to receive updates.
Subscribe subscribe = 6;
// client -> database, send a one-off SQL query without establishing a subscription.
OneOffQuery oneOffQuery = 7;
// database -> client, return results to a one off SQL query.
OneOffQueryResponse oneOffQueryResponse = 8;
}
}

Expand Down Expand Up @@ -191,3 +195,33 @@ message TransactionUpdate {
Event event = 1;
SubscriptionUpdate subscriptionUpdate = 2;
}

/// A one-off query submission.
///
/// Query should be a "SELECT * FROM Table WHERE ...". Other types of queries will be rejected.
/// Multiple such semicolon-delimited queries are allowed.
///
/// One-off queries are identified by a client-generated messageID.
/// To avoid data leaks, the server will NOT cache responses to messages based on UUID!
/// It also will not check for duplicate IDs. They are just a way to match responses to messages.
message OneOffQuery {
bytes messageId = 1;
string queryString = 2;
}

/// A one-off query response.
/// Will contain either one error or multiple response rows.
/// At most one of these messages will be sent in reply to any query.
///
/// The messageId will be identical to the one sent in the original query.
message OneOffQueryResponse {
bytes messageId = 1;
string error = 2;
repeated OneOffTable tables = 3;
}

/// A table included as part of a one-off query.
message OneOffTable {
string tableName = 2;
repeated bytes row = 4;
}
21 changes: 20 additions & 1 deletion crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::worker_metrics::{CONNECTED_CLIENTS, WEBSOCKET_SENT, WEBSOCKET_SENT_MS
use futures::prelude::*;
use tokio::sync::mpsc;

use super::messages::ServerMessage;
use super::messages::{OneOffQueryResponseMessage, ServerMessage};
use super::{message_handlers, ClientActorId, MessageHandleError};

#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
Expand Down Expand Up @@ -168,4 +168,23 @@ impl ClientConnection {
pub fn subscribe(&self, subscription: Subscribe) -> Result<(), NoSuchModule> {
self.module.subscription().add_subscriber(self.sender(), subscription)
}

pub async fn one_off_query(&self, query: &str, message_id: &[u8]) -> Result<(), anyhow::Error> {
let result = self.module.one_off_query(self.id.identity, query.to_owned()).await;
let message_id = message_id.to_owned();
let response = match result {
Ok(results) => OneOffQueryResponseMessage {
message_id,
error: None,
results,
},
Err(err) => OneOffQueryResponseMessage {
message_id,
error: Some(format!("{}", err)),
results: Vec::new(),
},
};
self.send_message(response).await?;
Ok(())
}
}
97 changes: 80 additions & 17 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::host::{EnergyDiff, ReducerArgs, Timestamp};
use crate::identity::Identity;
use crate::protobuf::client_api::{message, FunctionCall, Message, Subscribe};
use crate::worker_metrics::{WEBSOCKET_REQUESTS, WEBSOCKET_REQUEST_MSG_SIZE};
use base64::Engine;
use bytes::Bytes;
use bytestring::ByteString;
use prost::Message as _;
Expand All @@ -20,6 +21,8 @@ pub enum MessageHandleError {
InvalidMessage,
#[error(transparent)]
TextDecode(#[from] serde_json::Error),
#[error(transparent)]
Base64Decode(#[from] base64::DecodeError),

#[error(transparent)]
Execution(#[from] MessageExecutionError),
Expand Down Expand Up @@ -53,6 +56,10 @@ async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Resul
DecodedMessage::Call { reducer, args }
}
Some(message::Type::Subscribe(subscription)) => DecodedMessage::Subscribe(subscription),
Some(message::Type::OneOffQuery(ref oneoff)) => DecodedMessage::OneOffQuery {
query_string: &oneoff.query_string[..],
message_id: &oneoff.message_id[..],
},
_ => return Err(MessageHandleError::InvalidMessage),
};

Expand All @@ -61,27 +68,50 @@ async fn handle_binary(client: &ClientConnection, message_buf: Vec<u8>) -> Resul
Ok(())
}

async fn handle_text(client: &ClientConnection, message: String) -> Result<(), MessageHandleError> {
#[derive(serde::Deserialize)]
enum Message<'a> {
#[serde(rename = "call")]
Call {
#[serde(borrow, rename = "fn")]
func: std::borrow::Cow<'a, str>,
args: &'a serde_json::value::RawValue,
},
#[serde(rename = "subscribe")]
Subscribe { query_strings: Vec<String> },
}
#[derive(serde::Deserialize)]
enum RawJsonMessage<'a> {
#[serde(rename = "call")]
Call {
#[serde(borrow, rename = "fn")]
func: std::borrow::Cow<'a, str>,
args: &'a serde_json::value::RawValue,
},
#[serde(rename = "subscribe")]
Subscribe { query_strings: Vec<String> },
#[serde(rename = "one_off_query")]
OneOffQuery {
#[serde(borrow)]
query_string: std::borrow::Cow<'a, str>,

/// A base64-encoded string of bytes.
#[serde(borrow)]
message_id: std::borrow::Cow<'a, str>,
},
}

async fn handle_text(client: &ClientConnection, message: String) -> Result<(), MessageHandleError> {
let message = ByteString::from(message);
let msg = serde_json::from_str::<Message>(&message)?;
let msg = serde_json::from_str::<RawJsonMessage>(&message)?;
let mut message_id_ = Vec::new();
let msg = match msg {
Message::Call { ref func, args } => {
RawJsonMessage::Call { ref func, args } => {
let args = ReducerArgs::Json(message.slice_ref(args.get()));
DecodedMessage::Call { reducer: func, args }
}
Message::Subscribe { query_strings } => DecodedMessage::Subscribe(Subscribe { query_strings }),
RawJsonMessage::Subscribe { query_strings } => DecodedMessage::Subscribe(Subscribe { query_strings }),
RawJsonMessage::OneOffQuery {
query_string: ref query,
message_id,
} => {
let _ = std::mem::replace(
&mut message_id_,
base64::engine::general_purpose::STANDARD.decode(&message_id[..])?,
);
DecodedMessage::OneOffQuery {
query_string: &query[..],
message_id: &message_id_[..],
}
}
};

msg.handle(client).await?;
Expand All @@ -90,8 +120,15 @@ async fn handle_text(client: &ClientConnection, message: String) -> Result<(), M
}

enum DecodedMessage<'a> {
Call { reducer: &'a str, args: ReducerArgs },
Call {
reducer: &'a str,
args: ReducerArgs,
},
Subscribe(Subscribe),
OneOffQuery {
query_string: &'a str,
message_id: &'a [u8],
},
}

impl DecodedMessage<'_> {
Expand All @@ -102,6 +139,10 @@ impl DecodedMessage<'_> {
res.map(drop).map_err(|e| (Some(reducer), e.into()))
}
DecodedMessage::Subscribe(subscription) => client.subscribe(subscription).map_err(|e| (None, e.into())),
DecodedMessage::OneOffQuery {
query_string: query,
message_id,
} => client.one_off_query(query, message_id).await.map_err(|err| (None, err)),
};
res.map_err(|(reducer, err)| MessageExecutionError {
reducer: reducer.map(str::to_owned),
Expand All @@ -111,7 +152,7 @@ impl DecodedMessage<'_> {
}
}

/// An error that arises from
/// An error that arises from executing a message.
#[derive(thiserror::Error, Debug)]
#[error("error executing message (reducer: {reducer:?}) (err: {err:?})")]
pub struct MessageExecutionError {
Expand Down Expand Up @@ -154,3 +195,25 @@ impl ServerMessage for MessageExecutionError {
.serialize_binary()
}
}

#[cfg(test)]
mod tests {
use super::RawJsonMessage;

#[test]
fn parse_one_off_query() {
let message = r#"{ "one_off_query": { "message_id": "ywS3WFquDECZQ0UdLZN1IA==", "query_string": "SELECT * FROM User WHERE name != 'bananas'" } }"#;
let parsed = serde_json::from_str::<RawJsonMessage>(message).unwrap();

if let RawJsonMessage::OneOffQuery {
query_string: query,
message_id,
} = parsed
{
assert_eq!(query, "SELECT * FROM User WHERE name != 'bananas'");
assert_eq!(message_id, "ywS3WFquDECZQ0UdLZN1IA==");
} else {
panic!("wrong variant")
}
}
}
56 changes: 55 additions & 1 deletion crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use base64::Engine;
use prost::Message as _;
use spacetimedb_client_api_messages::client_api::{OneOffQueryResponse, OneOffTable};
use spacetimedb_lib::relation::MemTable;

use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent};
use crate::identity::Identity;
use crate::json::client_api::{EventJson, FunctionCallJson, IdentityTokenJson, MessageJson, TransactionUpdateJson};
use crate::json::client_api::{
EventJson, FunctionCallJson, IdentityTokenJson, MessageJson, OneOffQueryResponseJson, OneOffTableJson,
TransactionUpdateJson,
};
use crate::protobuf::client_api::{event, message, Event, FunctionCall, IdentityToken, Message, TransactionUpdate};

use super::{DataMessage, Protocol};
Expand Down Expand Up @@ -177,3 +183,51 @@ where
}
}
}

pub struct OneOffQueryResponseMessage {
pub message_id: Vec<u8>,
pub error: Option<String>,
pub results: Vec<MemTable>,
}

impl ServerMessage for OneOffQueryResponseMessage {
fn serialize_text(self) -> MessageJson {
MessageJson::OneOffQueryResponse(OneOffQueryResponseJson {
message_id_base64: base64::engine::general_purpose::STANDARD.encode(self.message_id),
error: self.error,
result: self
.results
.into_iter()
.map(|table| OneOffTableJson {
table_name: table.head.table_name,
rows: table.data.into_iter().map(|row| row.elements).collect(),
})
.collect(),
})
}

fn serialize_binary(self) -> Message {
Message {
r#type: Some(message::Type::OneOffQueryResponse(OneOffQueryResponse {
message_id: self.message_id,
error: self.error.unwrap_or_default(),
tables: self
.results
.into_iter()
.map(|table| OneOffTable {
table_name: table.head.table_name,
row: table
.data
.into_iter()
.map(|row| {
let mut row_bytes = Vec::new();
row.encode(&mut row_bytes);
row_bytes
})
.collect(),
})
.collect(),
})),
}
}
}
18 changes: 18 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,24 @@ impl RelationalDB {
self.finish_tx(tx, res)
}

/// Run a fallible function in a transaction.
///
/// This is similar to `with_auto_commit`, but regardless of the return value of
/// the fallible function, the transaction will ALWAYS be rolled back. This can be used to
/// emulate a read-only transaction.
///
/// TODO(jgilles): when we support actual read-only transactions, use those here instead.
pub fn with_read_only<F, A, E>(&self, f: F) -> Result<A, E>
where
F: FnOnce(&mut MutTxId) -> Result<A, E>,
E: From<DBError>,
{
let mut tx = self.begin_tx();
let res = f(&mut tx);
self.rollback_tx(tx);
res
}

/// Perform the transactional logic for the `tx` according to the `res`
pub fn finish_tx<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<A, E>
where
Expand Down
31 changes: 31 additions & 0 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::protobuf::client_api::{table_row_operation, SubscriptionUpdate, Table
use crate::subscription::module_subscription_actor::ModuleSubscriptionManager;
use base64::{engine::general_purpose::STANDARD as BASE_64_STD, Engine as _};
use indexmap::IndexMap;
use spacetimedb_lib::relation::MemTable;
use spacetimedb_lib::{ReducerDef, TableDef};
use spacetimedb_sats::{ProductValue, Typespace, WithTypespace};
use std::collections::HashMap;
Expand Down Expand Up @@ -213,6 +214,11 @@ enum ModuleHostCommand {
log_level: LogLevel,
message: String,
},
OneOffQuery {
caller_identity: Identity,
query: String,
respond_to: oneshot::Sender<Result<Vec<MemTable>, DBError>>,
},
}

impl ModuleHostCommand {
Expand All @@ -237,6 +243,11 @@ impl ModuleHostCommand {
log_level,
message,
} => actor.inject_logs(respond_to, log_level, message),
ModuleHostCommand::OneOffQuery {
caller_identity,
query,
respond_to,
} => actor.one_off_query(caller_identity, respond_to, query),
}
}
}
Expand Down Expand Up @@ -272,6 +283,12 @@ pub trait ModuleHostActor: Send + 'static {
fn init_database(&mut self, args: ArgsTuple, respond_to: oneshot::Sender<Result<ReducerCallResult, anyhow::Error>>);
fn update_database(&mut self, respond_to: oneshot::Sender<Result<UpdateDatabaseResult, anyhow::Error>>);
fn inject_logs(&self, respond_to: oneshot::Sender<()>, log_level: LogLevel, message: String);
fn one_off_query(
&self,
caller_identity: Identity,
respond_to: oneshot::Sender<Result<Vec<MemTable>, DBError>>,
query: String,
);
fn close(self);
}

Expand Down Expand Up @@ -484,6 +501,20 @@ impl ModuleHost {
.await
}

pub async fn one_off_query(
&self,
caller_identity: Identity,
query: String,
) -> Result<Vec<MemTable>, anyhow::Error> {
Ok(self
.call(|respond_to| ModuleHostCommand::OneOffQuery {
caller_identity,
query,
respond_to,
})
.await??)
}

pub fn downgrade(&self) -> WeakModuleHost {
WeakModuleHost {
info: self.info.clone(),
Expand Down
Loading

0 comments on commit ad6e4ae

Please sign in to comment.