Skip to content

Commit

Permalink
Merge pull request #9 from voxoco/tcp-support
Browse files Browse the repository at this point in the history
Tcp support
  • Loading branch information
jmordica committed Jan 7, 2023
2 parents 633f173 + 4eb6e0d commit 5eae39d
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 284 deletions.
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ $ nqlite --wshost=wss://FQDN --creds=./nats.creds
### Command line options

```bash
--wshost=wss://... - NATS Websocket URL - required
--creds=./nats.creds - NATS creds file - optional
--token=secret - NATS auth token - optional
--data-dir=/nqlite-data - Data directory - optional
--h - Help

# Pass either creds or token but not both
nqlite [options]
--nats-host=wss://... # NATS NATS host e.g 'nats://localhost:4222' || 'ws://localhost:8080' (required)

--creds=./nats.creds # NATS creds file - required if --token not provided

--token=secret # NATS auth token - required if --creds not provided

--data-dir=/nqlite-data # Data directory - optional (default: ./nqlite-data)

--h - Help
```

### Docker
Expand Down
278 changes: 93 additions & 185 deletions deno.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion import_map.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"imports": {
"sqlite3": "https://deno.land/x/sqlite3@0.6.1/mod.ts",
"sqlite3": "https://deno.land/x/sqlite3@0.7.2/mod.ts",
"natsws": "https://deno.land/x/natsws@v1.11.1/src/mod.ts",
"nats": "https://deno.land/x/nats@v1.10.2/src/mod.ts",
"serve": "https://deno.land/std@0.170.0/http/server.ts",
"hono": "https://deno.land/x/hono@v2.6.2/mod.ts"
}
Expand Down
30 changes: 20 additions & 10 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { parse } from "https://deno.land/std@0.168.0/flags/mod.ts";

const flags = parse(Deno.args, {
boolean: ["help"],
string: ["wshost", "creds", "token", "data-dir"],
string: ["nats-host", "creds", "token", "data-dir"],
alias: { h: "help" },
stopEarly: true,
default: {
help: false,
wshost: "ws://localhost:8080",
"nats-host": "",
creds: "",
token: "",
"data-dir": ".nqlite-data",
Expand All @@ -16,13 +16,17 @@ const flags = parse(Deno.args, {

const showHelp = () => {
console.log("Usage: ./nqlite [options]");
console.log(" --help, -h: Show this help");
console.log(" --help, -h: Show this help");
console.log(
" --wshost: NATS websocket server URL (default: 'ws://localhost:8080')",
" --nats-host: NATS host e.g 'nats://localhost:4222' || 'ws://localhost:8080' (required)",
);
console.log(" --token: NATS authentication token (default: none)");
console.log(" --creds: NATS credentials file (default: none)");
console.log(" --data-dir: Data directory (default: '.data'");
console.log(
" --token: NATS authentication token (required if --creds is not provided)",
);
console.log(
" --creds: NATS credentials file (required if --token is not provided)",
);
console.log(" --data-dir: Data directory (default: '.data'");
Deno.exit(0);
};

Expand All @@ -31,25 +35,31 @@ if (flags.help) showHelp();
// If no credentials or token are provided, proceed without authentication
if (!flags.creds && !flags.token) {
console.log(
"Warning: no credentials or token provided. Proceeding without authentication",
"Warning: no --creds or --token provided. Proceeding without authentication",
);
}

// If both credentials and token are provided, exit
if (flags.creds && flags.token) {
console.log(
"Error: both credentials and token provided. Please provide only one",
"Error: both --creds and --token provided. Please provide only one",
);
showHelp();
}

// Make sure nats-host is provided
if (!flags["nats-host"]) {
console.log("Error: --nats-host is required");
showHelp();
}

import { Nqlite, Options } from "./mod.ts";

// Startup nqlite
const nqlite = new Nqlite();

const opts: Options = {
url: flags["wshost"],
url: flags["nats-host"],
creds: flags["creds"],
token: flags["token"],
dataDir: flags["data-dir"],
Expand Down
142 changes: 73 additions & 69 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ import {
ObjectStore,
PubAck,
StringCodec,
} from "natsws";
} from "nats";
import { serve } from "serve";
import { Context, Hono } from "hono";
import { bootstrapDataDir, dbSetup, nats, restore, snapshot } from "./util.ts";
import {
bootstrapDataDir,
restore,
setupDb,
setupNats,
snapshot,
snapshotCheck,
} from "./util.ts";
import { Database } from "sqlite3";
import { NatsRes, Options, ParseRes, Res } from "./types.ts";
import { parse } from "./parse.ts";
Expand All @@ -29,13 +36,15 @@ export class Nqlite {
snapInterval: number;
snapThreshold: number;
jsm!: JetStreamManager;
inSnapShot: boolean;

// Create a constructor
constructor() {
this.app = "nqlite";
this.subject = `${this.app}.push`;
this.snapInterval = 2;
this.snapThreshold = 1024;
this.inSnapShot = false;
}

// Init function to connect to NATS
Expand All @@ -49,14 +58,14 @@ export class Nqlite {
await bootstrapDataDir(this.dataDir);

// Initialize NATS
const res: NatsRes = await nats({ url, app: this.app, creds, token });
const res: NatsRes = await setupNats({ url, app: this.app, creds, token });
({ js: this.js, os: this.os, jsm: this.jsm } = res);

// Restore from snapshot if exists
await restore(this.os, this.dbFile);

// Setup to the database
this.db = dbSetup(this.dbFile);
this.db = setupDb(this.dbFile);

// Setup the API
this.http();
Expand All @@ -69,19 +78,27 @@ export class Nqlite {

// Handle SIGINT
Deno.addSignalListener("SIGINT", async () => {
// Check if inSnapShot is true
if (this.inSnapShot) {
console.log("SIGINT received while in snapshot. Waiting 10 seconds...");
await new Promise((resolve) => setTimeout(resolve, 10000));
}

console.log("About to die! Draining subscription...");
await this.sub.drain();
await this.sub.destroy();
console.log("Closing the database");
this.db.close();
console.log("Removing the data directory");
await Deno.remove(this.dataDir, { recursive: true });
Deno.exit();
});
}

// Get the latest sequence number
getSeq(): number {
return this.db.prepare(`SELECT seq FROM _nqlite_`).get()!.seq as number;
const stmt = this.db.prepare(`SELECT seq FROM _nqlite_ where id = 1`);
const seq = stmt.get()!.seq;
stmt.finalize();
return seq as number;
}

// Set the latest sequence number
Expand All @@ -102,7 +119,7 @@ export class Nqlite {

// Check for simple bulk query
if (s.bulkItems.length && s.simple) {
for (const p of s.bulkItems) this.db.exec(p);
for (const p of s.bulkItems) this.db.prepare(p).run();
res.time = performance.now() - s.t;
res.results[0].last_insert_id = this.db.lastInsertRowId;
return res;
Expand All @@ -122,6 +139,7 @@ export class Nqlite {
if (s.isRead) {
res.results[0].rows = s.simple ? stmt.all() : stmt.all(...s.params);
res.time = performance.now() - s.t;
stmt.finalize();
return res;
}

Expand All @@ -137,19 +155,14 @@ export class Nqlite {
// Setup ephemeral consumer
async consumer(): Promise<void> {
// Get the latest sequence number
const opts = consumerOpts();
opts.manualAck();
opts.ackExplicit();
opts.maxAckPending(10);
opts.deliverTo(createInbox());

const seq = this.getSeq() + 1;
opts.startSequence(seq);

const opts = consumerOpts().manualAck().ackExplicit().maxAckPending(10)
.deliverTo(createInbox()).startSequence(seq).idleHeartbeat(500);

// Get the latest sequence number in the stream
const s = await this.jsm.streams.info(this.app);

console.log("Messages in the stream ->>", s.state.messages);
console.log("Starting sequence ->>", seq);
console.log("Last sequence in stream ->>", s.state.last_seq);

Expand Down Expand Up @@ -180,74 +193,73 @@ export class Nqlite {

// Handle NATS push consumer messages
async iterator(sub: JetStreamSubscription, lastSeq?: number) {
for await (const m of sub) {
const data = JSON.parse(this.sc.decode(m.data));

try {
const res = parse(data, performance.now());
try {
for await (const m of sub) {
const data = JSON.parse(this.sc.decode(m.data));

try {
const res = parse(data, performance.now());

// Handle errors
if (res.error) {
console.log("Parse error:", res.error);
m.ack();
this.setSeq(m.seq);
continue;
}

// Handle errors
if (res.error) {
console.log("Parse error:", res.error);
m.ack();
continue;
this.execute(res);
} catch (e) {
console.log("Execute error: ", e.message, "Query: ", data);
}

this.execute(res);
} catch (e) {
console.log("Execute error: ", e.message, "Query: ", data);
}

m.ack();
this.setSeq(m.seq);
m.ack();
this.setSeq(m.seq);

// Check for last sequence
if (lastSeq) {
if (m.seq === lastSeq) {
// Check for last sequence
if (lastSeq && m.seq === lastSeq) {
console.log("Caught up to last msg ->>", lastSeq);
}
}
} catch (e) {
console.log("Iterator error: ", e.message);
await this.consumer();
}
}

// Snapshot poller
async snapshotPoller() {
console.log("Starting snapshot poller");
while (true) {
this.inSnapShot = false;
// Wait for the interval to pass
await new Promise((resolve) =>
setTimeout(resolve, this.snapInterval * 60 * 60 * 1000)
);

try {
// First VACUUM the database to free up space
this.db.exec("VACUUM");
this.inSnapShot = true;

// Check object store for snapshot
const snapInfo = await this.os.info(this.app);
try {
// Unsubscribe from the stream so we stop receiving db updates
console.log("Drained subscription...");
await this.sub.drain();
await this.sub.destroy();

// Check if we need to snapshot
if (snapInfo) {
const processed = this.getSeq() - Number(snapInfo.description);
if (processed < this.snapThreshold) {
console.log(
`Skipping snapshot, threshold not met: ${processed} < ${this.snapThreshold}`,
);
continue;
}
}
// VACUUM the database to free up space
console.log("VACUUM...");
this.db.exec("VACUUM");

if (!snapInfo && this.getSeq() < this.snapThreshold) {
console.log(
`Skipping snapshot, threshold not met: ${this.getSeq()} < ${this.snapThreshold}`,
);
// Check if we should run a snapshot
const run = await snapshotCheck(
this.os,
this.getSeq(),
this.snapThreshold,
);
if (!run) {
await this.consumer();
continue;
}

// Unsubscribe from the stream so we stop receiving db updates
await this.sub.drain();
console.log("Drained subscription...");

// Snapshot the database to object store
let seq = this.getSeq();
if (await snapshot(this.os, this.dbFile, seq)) {
Expand Down Expand Up @@ -292,11 +304,7 @@ export class Nqlite {
const data = parse(JSON.parse(r), perf);
return c.json(this.execute(data));
} catch (e) {
if (e instanceof Error) {
res.results[0].error = e.message;
return c.json(res, 400);
}
res.results[0].error = "Invalid Query";
res.results[0].error = e.message;
return c.json(res, 400);
}
});
Expand Down Expand Up @@ -325,11 +333,7 @@ export class Nqlite {
? c.json(this.execute(data))
: c.json(await this.publish(data));
} catch (e) {
if (e instanceof Error) {
res.results[0].error = e.message;
return c.json(res, 400);
}
res.results[0].error = "Invalid Query";
res.results[0].error = e.message;
return c.json(res, 400);
}
});
Expand Down
4 changes: 3 additions & 1 deletion types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
JetStreamClient,
JetStreamManager,
ObjectStore,
} from "natsws";
} from "nats";
import { RestBindParameters } from "sqlite3";

export type NatsInit = {
Expand All @@ -18,6 +18,8 @@ export type NatsConf = {
servers: string;
authenticator?: Authenticator;
token?: string;
maxReconnectAttempts?: number;
waitOnFirstConnect?: boolean;
};

export type NatsRes = {
Expand Down
Loading

0 comments on commit 5eae39d

Please sign in to comment.