Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native networking layer #38

Open
4 tasks
tantaman opened this issue Nov 14, 2022 · 6 comments
Open
4 tasks

Native networking layer #38

tantaman opened this issue Nov 14, 2022 · 6 comments

Comments

@tantaman
Copy link
Collaborator

tantaman commented Nov 14, 2022

We'll start by creating a new set of primitives for networking atop the existing crsql_changes table. This will be a higher level set of primitives that make authoring network layers simpler.

The new primitives are:

  1. InboundStream
  2. OutboundStream

An inbound stream represents a stream of changes coming from a remote database into the local database.

An outbound stream is a stream of changes coming from the local database to a remote.

These streams will be implemented as virtual tables. Rather than being eponymous virtual tables (crsql_changes is eponymous), these will be virtual tables that can be instantiated.

E.g.,

CREATE VIRTUAL TABLE out_to_peer_a USING crsql_outbound_stream(remote = peer_a_id);
CREATE VIRTUAL TABLE in_from_peer_a USING crsql_inbound_stream(remote = peer_a_id);

The Inbound/Outbound streams will assume in-order delivery (primitives for out of order deliver can follow later). The Inbound/Outbound streams will manage bookkeeping for the stream to ensure changes are sent and received in-order and that streams can be resumed as well as deliver incremental changes.

Tasks:

  1. Create a stub virtual table for crsql_outbound_stream. Just the wiring, don't worry about implementation yet
    1. Here is an example of creating a vtab in Rust: https://github.com/vlcn-io/cr-sqlite/blob/main/core/rs/core/src/create_cl_set_vtab.rs
  2. Repeat for crsql_inbound_stream
  3. Implement the ability to SELECT from the outbound stream. See select from outbound stream
  4. Implement the ability to INSERT into the inbound stream

Select From Outbound Stream

SELECT change, cursor FROM outbound_stream WHERE cursor > :last_cursor

The outbound stream is only queryable by cursor. The change column contains all values that describe the change, packed together in a binary format. We can re-use this logic for packing columns: https://github.com/vlcn-io/cr-sqlite/blob/main/core/rs/core/src/pack_columns.rs

The underlying implementation delegates to crsql_changes

Persist the Outbound Stream Position

Provide hidden columns for last_sent and last_retrieved cursors? These columns allow resumption of an outbound stream if the connection closes, process restarts, peer returns an error, etc.

Insert Into Inbound Stream

INSERT INTO inbound_stream (change, cursor) VALUES (?, ?);

todo

@tantaman
Copy link
Collaborator Author

tantaman commented Dec 8, 2022

#94 is our first pass at a production grade typescript networking layer.

@tantaman
Copy link
Collaborator Author

Current thoughts are that the native networking layer will be written in Rust (given cr-sqlite is being migrated to Rust).

It'll support websocket to work with the existing server implementation.

For p2p, I'll need to investigate libp2p and other options.

@tantaman
Copy link
Collaborator Author

This may be resolved by merging with sqld & libsql and adding the required endpoints for merging. tbd.

@tantaman
Copy link
Collaborator Author

tantaman commented Aug 8, 2023

ok.. so after writing 5 different implementations 😬 I think we're ready to do a native one.

Background

The complexity for a networking layer for cr-sqlite comes from the fact that they are so many possible ways to sync. You can sync client-server or p2p. You can sync with in-order delivery or out-of-order delivery. You can combine those different methods.

Also -- since cr-sqlite is embedded -- people generally want to run their networking layer over their existing APIs or in their existing servers. E.g., adding endpoints to their rest APIs, adding message types to their existing websocket server, using their same nodejs/netty/tokio server to sync, etc.

Traditional DBs don't have this problem given they aren't embedded.

Most setups I've run into are fine with in-order delivery. fly.io is the one use case that uses out of order delivery but they've already rolled their own networking layer which will eventually be open sourced.

Plan

Given :

  • The DB is embedded and people will want to push changes out over whatever transport they're using.
  • Most use cases want/need in-order delivery
  • The current primitives of cr-sqlite are way too low level for people to easily write their own sync servers.

My plan is to provide inbound and outbound stream abstractions, in native code, to vastly simplify network layer creation.

Current Concerns

When someone writes their own networking layer they must track:

  • Last seen versions of peers for in-order delivery
  • Acks for backpressure and/or rolling a stream back
  • Database change notifications
  • Paginating large transactions
  • Paginating large syncs on initial sync
  • Errors to roll back streams
  • etc.

Inbound and Outbound stream will deal with these for the developer and be exposed as new sqlite methods such that they can be embedded into and invoked from any language the user desires.

OutboundStream

This is an abstraction over SELECT * FROM crsql_changes WHERE ... that, in addition to pulling changes, does all the required bookkeeping to ensure.

Potential API:

// create the stream
stream = db.create_outbound_stream(since_version: [version, seq], excludeSites?, localOnly?);

// advance the stream
stream.next()

Creating the stream, the user specifies:

  1. The version at which to begin the stream
  2. Optional set of sites to exclude from the change log. Usually this would be the receiving site id.
  3. Whether or not to only include local changes

The user then advances the stream at their leisure. This is to allow users with realtime use cases to sync often and users with different use cases to sync less frequently.

next will pull all changes that have happened since the last time next was called and return them to the caller.

So this isn't quite a networking abstraction -- the user will still need to pass those changes over their own transport.

One thing to work out is how to handle if next is a very large chunk. next should either:

  • take a step size
  • or return another iterator to iterate over the chunk

Also -- the user would want to be able to ack the next so that the internal pointer in the stream is not updated until the changes are sent over the wire.

Given that, next should return some sort of object for:

  1. Getting the current grouping of changes pointed at
  2. Acking / finalizing that set so bookkeeping can happen

InboundStream

stream = db.create_inbound_stream(from: site_id);
stream.applyChanges(results_from_outbound_stream);

Given the rate of sending is controlled by the sender, there isn't much to do with InboundStream other than create it and stop it when desired.

The output of an OutboundStream is meant to be fed directly into an InboundStream. The inbound stream will ensure that received changes are contiguous and do the bookkeeping so it can be restarted at future points in time.

Payloads

Inbound & Outbound stream payloads will be serialized as opaque binary blobs. The user shouldn't need to interact with them other than to send them over a network connection.

Some transports do not support binary so in those cases we should allow for JSON encoding the payloads as a configuration option of inbound/outbound stream.

@tantaman
Copy link
Collaborator Author

tantaman commented Aug 8, 2023

Once the Inbound and Outbound stream abstractions are in place I'll update one or more of the PartyKit, P2PRTC, direct-connect examples as transports and then decide on what a default native transport would look like.

@tantaman
Copy link
Collaborator Author

tantaman commented Aug 14, 2023

These new primitives (stateful Inbound and Outbound streams) could be exposed over SQL so as to not require custom bindings.

I'm thinking as virtual tables.

CREATE VIRTUAL TABLE stream_out_to_[server_id] USING crsql_OutboundStream (
  since = ..., -- exposed in case they don't want to start from zero 🤷‍♂️
  exclude_sites = [server_id]
);

SELECT package, cursor FROM stream_out_to_[server_id] WHERE cursor > ?
  • cursor will be [version, seq] pairings
  • package will be all the data required for transmission pre-packed into a blob.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant