Skip to content

Commit

Permalink
Revert "[ABI] Remove the special first element of iterator (#420)"
Browse files Browse the repository at this point in the history
This reverts commit 469dff6.
  • Loading branch information
cloutiertyler committed Oct 18, 2023
1 parent 5864bfa commit d1ce99b
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 32 deletions.
12 changes: 7 additions & 5 deletions crates/bindings-csharp/Runtime/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,20 @@ public void Reset()

public class RawTableIter : IEnumerable<byte[]>
{
private readonly uint tableId;
private readonly byte[]? filterBytes;
public readonly byte[] Schema;

private readonly IEnumerator<byte[]> iter;

public RawTableIter(uint tableId, byte[]? filterBytes = null)
{
this.tableId = tableId;
this.filterBytes = filterBytes;
iter = new BufferIter(tableId, filterBytes);
iter.MoveNext();
Schema = iter.Current;
}

public IEnumerator<byte[]> GetEnumerator()
{
return new BufferIter(tableId, filterBytes);
return iter;
}

IEnumerator IEnumerable.GetEnumerator()
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static MonoArray* stdb_buffer_consume(Buffer buf);
// return out;
// }

#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_7." #minor
#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_6." #minor
#define STDB_IMPORT_MODULE STDB_IMPORT_MODULE_MINOR(0)

__attribute__((import_module(STDB_IMPORT_MODULE),
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod raw {
// on. Any non-breaking additions to the abi surface should be put in a new `extern {}` block
// with a module identifier with a minor version 1 above the previous highest minor version.
// For breaking changes, all functions should be moved into one new `spacetime_X.0` block.
#[link(wasm_import_module = "spacetime_7.0")]
#[link(wasm_import_module = "spacetime_6.0")]
extern "C" {
/*
/// Create a table with `name`, a UTF-8 slice in WASM memory lasting `name_len` bytes,
Expand Down
42 changes: 31 additions & 11 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,34 @@ pub fn delete_range(table_id: u32, col_id: u8, range: Range<AlgebraicValue>) ->
//
// }

// Get the buffer iterator for this table,
// with an optional filter,
// and return it and its decoded `ProductType` schema.
fn buffer_table_iter(
table_id: u32,
filter: Option<spacetimedb_lib::filter::Expr>,
) -> Result<(BufferIter, ProductType)> {
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let mut iter = sys::iter(table_id, filter.as_deref())?;

// First item is an encoded schema.
let schema_raw = iter
.next()
.expect("Missing schema")
.expect("Failed to get schema")
.read();
let schema = decode_schema(&mut &schema_raw[..]).expect("Could not decode schema");

Ok((iter, schema))
}

/// A table iterator which yields `ProductValue`s.
// type ProductValueTableIter = RawTableIter<ProductValue, ProductValueBufferDeserialize>;

Expand All @@ -257,18 +285,10 @@ pub fn delete_range(table_id: u32, col_id: u8, range: Range<AlgebraicValue>) ->
/// A table iterator which yields values of the `TableType` corresponding to the table.
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;

// Get the iterator for this table with an optional filter,
fn table_iter<T: TableType>(table_id: u32, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<TableIter<T>> {
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let iter = sys::iter(table_id, filter.as_deref())?;

// The TableType deserializer doesn't need the schema, as we have type-directed
// dispatch to deserialize any given `TableType`.
let (iter, _schema) = buffer_table_iter(table_id, filter)?;
let deserializer = TableTypeBufferDeserialize::new();
Ok(RawTableIter::new(iter, deserializer).into())
}
Expand Down
37 changes: 26 additions & 11 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,9 @@ impl BufWriter for ChunkedWriter {
}

impl ChunkedWriter {
/// Flushes the data collected in the scratch space if it's larger than our
/// chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
/// Flushes the currently populated part of the scratch space as a new chunk.
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
Expand All @@ -67,11 +63,22 @@ impl ChunkedWriter {
}
}

/// Similar to [`Self::force_flush`], but only flushes if the data in the
/// scratch space is larger than our chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}

/// Finalises the writer and returns all the chunks.
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// Avoid extra clone by just shrinking and pushing the scratch space
// in-place.
// This is equivalent to calling `force_flush`, but we avoid extra
// clone by just shrinking and pushing the scratch space in-place.
self.chunks.push(self.scratch_space.into());
}
self.chunks
Expand Down Expand Up @@ -373,6 +380,10 @@ impl InstanceEnv {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
chunked_writer.force_flush();

for row in stdb.iter(tx, table_id)? {
row.view().encode(&mut chunked_writer);
// Flush at row boundaries.
Expand Down Expand Up @@ -413,12 +424,18 @@ impl InstanceEnv {
}
}

let mut chunked_writer = ChunkedWriter::default();

let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let schema = stdb.schema_for_table(tx, table_id)?;
let row_type = ProductType::from(&*schema);

// write and force flush schema as it's expected to be the first individual chunk
row_type.encode(&mut chunked_writer);
chunked_writer.force_flush();

let filter = filter::Expr::from_bytes(
// TODO: looks like module typespace is currently not hooked up to instances;
// use empty typespace for now which should be enough for primitives
Expand All @@ -436,8 +453,6 @@ impl InstanceEnv {
_ => unreachable!("query should always return a table"),
};

let mut chunked_writer = ChunkedWriter::default();

// write all rows and flush at row boundaries
for row in results.data {
row.data.encode(&mut chunked_writer);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmer/wasmer_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ impl WasmerModule {
WasmerModule { module, engine }
}

pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(7, 0);
pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(6, 0);

fn imports(&self, store: &mut Store, env: &FunctionEnv<WasmInstanceEnv>) -> Imports {
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(WasmerModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION);
imports! {
"spacetime_7.0" => {
"spacetime_6.0" => {
"_schedule_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::schedule_reducer),
"_cancel_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::cancel_reducer),
"_delete_by_col_eq" => Function::new_typed_with_env(
Expand Down
2 changes: 1 addition & 1 deletion crates/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use type_value::{AlgebraicValue, ProductValue};

pub use spacetimedb_sats as sats;

pub const MODULE_ABI_MAJOR_VERSION: u16 = 7;
pub const MODULE_ABI_MAJOR_VERSION: u16 = 6;

// if it ends up we need more fields in the future, we can split one of them in two
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
Expand Down

0 comments on commit d1ce99b

Please sign in to comment.