Skip to content

Commit

Permalink
Merge pull request #7 from voxoco/refactor-ephemeral-consumer
Browse files Browse the repository at this point in the history
refactor to simplify and use ephemeral consumer
  • Loading branch information
jmordica committed Jan 4, 2023
2 parents 79f5f9c + ae875a7 commit 07f5a85
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 159 deletions.
2 changes: 0 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
FROM denoland/deno:alpine

RUN apk add --no-cache tzdata

EXPOSE 4001

WORKDIR /app
Expand Down
48 changes: 25 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ at the edge like:

- Simple HTTP API for interacting with SQLite via `db/query`
- Snapshotting/restore out of the box (using
[KV](https://docs.nats.io/using-nats/developer/develop_jetstream/kv) and
[Object Store](https://docs.nats.io/using-nats/developer/develop_jetstream/object))
- NATS JetStream for SQL replication and persistence (via
[Durable Push Consumer](https://docs.nats.io/running-a-nats-service/nats_admin/jetstream_admin/consumers))
[Ephemeral Push Consumer](https://docs.nats.io/using-nats/developer/develop_jetstream/consumers))
- Lightweight, easy-to-use - just run the binary and pass in the NATS Websocket
URL
- Deploy anywhere - Linux, macOS, Windows, ARM, Edge, Kubernetes, Docker, etc.
Expand All @@ -41,7 +40,7 @@ it needs is a connection to NATS.

- NATS JetStream with at-least once delivery is a great fit for SQL replication
and persistence
- NATS KV and Object Store is a great fit for snapshotting/restore
- Object Store is a great fit for snapshotting/restore
- Who doesn't already use NATS for pub/sub?
- The need for a dead simple edge relational database closer to the application
- **Database nodes don't need to be aware of each other**
Expand Down Expand Up @@ -108,30 +107,32 @@ $ deno run -A --unstable https://deno.land/x/nqlite/main.ts --wshost=wss://FQDN

## How it works

nqlite is a Deno application that connects to NATS JetStream. It takes care of
bootstrapping itself and creating the necessary JetStream streams, consumers,
kv, and object store. It also takes care of snapshotting and restoring the
SQLite db.
nqlite is a Deno application that connects to NATS JetStream. It bootstraps
itself by creating the necessary JetStream streams, consumers, and object store.
It also takes care of snapshotting and restoring the SQLite db. When a node
starts up, it checks the object store for an SQLite snapshot. If it finds one,
it restores from the snapshot. If not, it creates a new SQLite db. The node then
subscribes to the JetStream stream at the last sequence number processed by the
SQLite db. Anytime the node receives a query via the stream, it executes the
query and updates the `_nqlite_` table with the sequence number. Read requests
are handled locally. Read more below about snapshotting and purging.

### Built-in configuration

```bash
# Data directory
./.data
# Default Data directory
./.data/nqlite

# SQLite file
./.data/nqlite.db

# Node UID generated on first run (used for durable consumer)
./.data/uid
./.data/nqlite/nqlite.db

# NATS JetStream stream
nqlite

# NATS JetStream publish subject
nqlite.push

# NATS kv and object store bucket
# NATS object store bucket
nqlite

# Snapshot interval hours (check how often to snapshot the db)
Expand All @@ -145,17 +146,18 @@ nqlite

### Snapshot and purging

Every `snapInterval` nqlite gets the latest snapshot kv key and compares it with
the last processed sequence number from JetStream. If the node has processed
more than `snapThreshold` messages since the kv snapshot sequence, the node
unsubscribes from the stream and attempts a snapshot.
Every `snapInterval` nqlite gets the latest snapshot sequence from object store
description and compares it with the last processed sequence number from
JetStream. If the node has processed more than `snapThreshold` messages since
the object store snapshot sequence, the node unsubscribes from the stream and
attempts a snapshot.

The node backs up the SQLite db to object store and sets the latest processed
sequence number to the snapshot key in kv and purges all previous messages from
the stream. The node then resumes the JetStream subscription. The nice thing
here is that JetStream will continue pushing messages to the interator from
where it left off (so the node doesn’t miss any db changes and eventually
catches back up).
sequence number to the object store `description` and purges all previous
messages from the stream (older than `snapSequence - snapThreshold`). The node
then resumes the JetStream subscription. The nice thing here is that JetStream
will continue pushing messages to the interator from where it left off (so the
node doesn’t miss any db changes and eventually catches back up).

The other nice thing about this setup is that we can still accept writes on this
node while the snapshotting is taking place. So the only sacrifice we make here
Expand Down
6 changes: 0 additions & 6 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ const flags = parse(Deno.args, {
},
});

// Handle SIGINT
Deno.addSignalListener("SIGINT", () => {
console.log("About to die!");
Deno.exit();
});

const showHelp = () => {
console.log("Usage: ./nqlite [options]");
console.log(" --help, -h: Show this help");
Expand Down
166 changes: 83 additions & 83 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,33 @@
import {
Codec,
ConsumerOptsBuilder,
consumerOpts,
createInbox,
JetStreamClient,
JetStreamManager,
JetStreamSubscription,
KV,
NatsConnection,
ObjectStore,
PubAck,
StringCodec,
} from "natsws";
import { serve } from "serve";
import { Context, Hono } from "hono";
import { nats, restore, snapshot } from "./util.ts";
import { bootstrapDataDir, dbSetup, nats, restore, snapshot } from "./util.ts";
import { Database } from "sqlite3";
import { NatsInit, NatsRes, Options, ParseRes, Res } from "./types.ts";
import { NatsRes, Options, ParseRes, Res } from "./types.ts";
import { parse } from "./parse.ts";

export class Nqlite {
dataDir!: string;
dbFile!: string;
nc!: NatsConnection;
sc: Codec<string> = StringCodec();
app: string;
js!: JetStreamClient;
db!: Database;
os!: ObjectStore;
kv!: KV;
subject: string;
sub!: JetStreamSubscription;
opts!: ConsumerOptsBuilder;
snapInterval: number;
snapThreshold: number;
seq: number;
jsm!: JetStreamManager;

// Create a constructor
Expand All @@ -41,50 +36,27 @@ export class Nqlite {
this.subject = `${this.app}.push`;
this.snapInterval = 2;
this.snapThreshold = 1024;
this.seq = 0;
}

// Init function to connect to NATS
async init(opts: Options): Promise<void> {
const { url, creds, token, dataDir } = opts;
// Make sure directory exists
this.dataDir = dataDir;

this.dataDir = `${dataDir}/${this.app}`;
this.dbFile = `${this.dataDir}/nqlite.db`;
await Deno.mkdir(this.dataDir, { recursive: true });

// NATS connection options
const conf = {
url,
app: this.app,
dataDir: this.dataDir,
creds,
token,
} as NatsInit;
// Bootstrap the dataDir
await bootstrapDataDir(this.dataDir);

// Initialize NATS
const res: NatsRes = await nats(conf);
({
js: this.js,
os: this.os,
kv: this.kv,
opts: this.opts,
jsm: this.jsm,
lastSeq: this.seq,
} = res);

// Restore from snapshot if needed
await restore(this.os, this.dbFile);
const res: NatsRes = await nats({ url, app: this.app, creds, token });
({ js: this.js, os: this.os, jsm: this.jsm } = res);

// Connect to the database
this.db = new Database(this.dbFile);
this.db.exec("pragma journal_mode = WAL");
this.db.exec("pragma synchronous = normal");
this.db.exec("pragma temp_store = memory");
// Restore from snapshot if exists
await restore(this.os, this.dbFile);

const version = this.db.prepare("select sqlite_version()").value<
[string]
>()!;
console.log(`SQLite version: ${version}`);
// Setup to the database
this.db = dbSetup(this.dbFile);

// Setup the API
this.http();
Expand All @@ -93,8 +65,26 @@ export class Nqlite {
this.snapshotPoller();

// Start iterating over the messages in the stream
this.sub = await this.js.subscribe(this.subject, this.opts);
this.iterator(this.sub);
await this.consumer();

// Handle SIGINT
Deno.addSignalListener("SIGINT", async () => {
console.log("About to die!. Draining subscription");
await this.sub.drain();
console.log("Closing the database");
this.db.close();
Deno.exit();
});
}

// Get the latest sequence number
getSeq(): number {
return this.db.prepare(`SELECT seq FROM _nqlite_`).get()!.seq as number;
}

// Set the latest sequence number
setSeq(seq: number): void {
this.db.prepare(`UPDATE _nqlite_ SET seq = ? where id = 1`).run(seq);
}

// Execute a statement
Expand Down Expand Up @@ -142,6 +132,21 @@ export class Nqlite {
return res;
}

// Setup ephemeral consumer
async consumer(): Promise<void> {
// Get the latest sequence number
const opts = consumerOpts();
opts.manualAck();
opts.ackExplicit();
opts.maxAckPending(10);
opts.deliverTo(createInbox());
const seq = this.getSeq() + 1;
opts.startSequence(seq);
console.log(`Starting consumer at seq: ${seq}`);
this.sub = await this.js.subscribe(this.subject, opts);
this.iterator(this.sub);
}

// Publish a message to NATS
async publish(s: ParseRes): Promise<Res> {
const res: Res = { results: [{}], time: 0 };
Expand All @@ -167,28 +172,25 @@ export class Nqlite {
async iterator(sub: JetStreamSubscription) {
for await (const m of sub) {
console.log(`Received sequence #: ${m.seq}`);
const data = JSON.parse(this.sc.decode(m.data));

try {
const res = parse(
JSON.parse(this.sc.decode(m.data)),
performance.now(),
);
const res = parse(data, performance.now());

// Handle errors
if (res.error) {
console.log("Parse error:", res.error);
m.ack();
this.seq = m.seq;
continue;
}

this.execute(res);
m.ack();
this.seq = m.seq;
} catch (e) {
console.log(e);
m.ack();
this.seq = m.seq;
console.log("Execute error: ", e.message, "Query: ", data);
}

m.ack();
this.setSeq(m.seq);
}
}

Expand All @@ -201,54 +203,52 @@ export class Nqlite {
setTimeout(resolve, this.snapInterval * 60 * 60 * 1000)
);

// Get the last snapshot sequence from the kv store
try {
// First VACUUM the database to free up space
this.db.exec("VACUUM");

const e = await this.kv.get("snapshot");
const lastSnapSeq = e ? Number(this.sc.decode(e.value)) : 0;
console.log(`Last snapshot sequence: ${lastSnapSeq}`);
// Check object store for snapshot
const snapInfo = await this.os.info(this.app);

// Check if we need to snapshot
if (this.seq - lastSnapSeq < this.snapThreshold) {
if (snapInfo) {
const processed = this.getSeq() - Number(snapInfo.description);
if (processed < this.snapThreshold) {
console.log(
`Skipping snapshot, threshold not met: ${processed} < ${this.snapThreshold}`,
);
continue;
}
}

if (!snapInfo && this.getSeq() < this.snapThreshold) {
console.log(
`Skipping snapshot, sequence number threshold not met: ${
this.seq - lastSnapSeq
} < ${this.snapThreshold}`,
`Skipping snapshot, threshold not met: ${this.getSeq()} < ${this.snapThreshold}`,
);
continue;
}

// Unsubscribe from the stream so we stop receiving db updates
this.sub.unsubscribe();
console.log("Unsubscribed from stream");
await this.sub.drain();
console.log("Drained subscription...");

// Snapshot the database to object store
if (!await snapshot(this.os, this.dbFile)) {
console.log("Error during snapshot");
this.sub = await this.js.subscribe(this.subject, this.opts);
this.iterator(this.sub);
continue;
let seq = this.getSeq();
if (await snapshot(this.os, this.dbFile, seq)) {
// Purge previos messages from the stream older than seq - snapThreshold
seq = seq - this.snapThreshold;
await this.jsm.streams.purge(this.app, {
filter: this.subject,
seq: seq < 0 ? 0 : seq,
});
}

// Update the kv store with the last snapshot sequence number
await this.kv.put("snapshot", this.sc.encode(String(this.seq)));
console.log(`Updated snapshot kv sequence to ${this.seq}`);

// Purge messages from the stream older than this.seq + 1
await this.jsm.streams.purge(this.app, {
filter: this.subject,
seq: this.seq + 1,
});
} catch (e) {
console.log("Error during snapshot polling", e);
console.log("Error during snapshot polling:", e.message);
}

// Resubscribe to the stream
console.log("Resubscribing to stream");
this.sub = await this.js.subscribe(this.subject, this.opts);
this.iterator(this.sub);
console.log(`Subscribing to stream after snapshot attempt`);
await this.consumer();
}
}

Expand Down
Loading

0 comments on commit 07f5a85

Please sign in to comment.