Skip to content

Commit

Permalink
Merge pull request #16 from voxoco/feat-external-backup
Browse files Browse the repository at this point in the history
added http backup method (Cloudflare R2)
  • Loading branch information
jmordica committed Mar 29, 2023
2 parents 20edcbc + 03cffe0 commit 4150f99
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 16 deletions.
33 changes: 31 additions & 2 deletions main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ import { parse } from "https://deno.land/std@0.168.0/flags/mod.ts";

const flags = parse(Deno.args, {
boolean: ["help"],
string: ["nats-host", "creds", "token", "data-dir"],
string: [
"nats-host",
"creds",
"token",
"data-dir",
"external-backup",
"external-backup-url",
],
alias: { h: "help" },
stopEarly: true,
default: {
Expand All @@ -11,6 +18,8 @@ const flags = parse(Deno.args, {
creds: "",
token: "",
"data-dir": ".nqlite-data",
"external-backup": "",
"external-backup-url": "",
},
});

Expand All @@ -26,7 +35,13 @@ const showHelp = () => {
console.log(
" --creds: NATS credentials file (required if --token is not provided)",
);
console.log(" --data-dir: Data directory (default: '.data'");
console.log(" --data-dir: Data directory (default: '.nqlite-data/')");
console.log(
" --external-backup: External backup/restore method (option: 'http')",
);
console.log(
" --external-backup-url: The HTTP url for backup/restore (only required if --external-backup is provided)",
);
Deno.exit(0);
};

Expand All @@ -53,6 +68,18 @@ if (!flags["nats-host"]) {
showHelp();
}

// Check if external backup is provided, and if so, make sure the url is provided
if (flags["external-backup"] && !flags["external-backup-url"]) {
console.log("Error: --external-backup-url is required");
showHelp();
}

// Make sure only allowed external backup methods are provided
if (flags["external-backup"] && flags["external-backup"] !== "http") {
console.log("Error: --external-backup only supports 'http'");
showHelp();
}

import { Nqlite, Options } from "./mod.ts";

// Startup nqlite
Expand All @@ -63,6 +90,8 @@ const opts: Options = {
creds: flags["creds"],
token: flags["token"],
dataDir: flags["data-dir"],
externalBackup: flags["external-backup"],
externalBackupUrl: flags["external-backup-url"],
};

await nqlite.init(opts);
28 changes: 23 additions & 5 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { serve } from "serve";
import { Context, Hono } from "hono";
import {
bootstrapDataDir,
httpBackup,
httpRestore,
restore,
setupDb,
setupNats,
Expand All @@ -40,6 +42,8 @@ export class Nqlite {
snapThreshold: number;
jsm!: JetStreamManager;
inSnapShot: boolean;
externalBackup!: string;
externalBackupUrl!: string;

// Create a constructor
constructor() {
Expand All @@ -52,10 +56,13 @@ export class Nqlite {

// Init function to connect to NATS
async init(opts: Options): Promise<void> {
const { url, creds, token, dataDir } = opts;
const { url, creds, token, dataDir, externalBackup, externalBackupUrl } =
opts;

this.dataDir = `${dataDir}/${this.app}`;
this.dbFile = `${this.dataDir}/nqlite.db`;
this.externalBackup = externalBackup;
this.externalBackupUrl = externalBackupUrl;

// Bootstrap the dataDir
await bootstrapDataDir(this.dataDir);
Expand All @@ -65,7 +72,13 @@ export class Nqlite {
({ nc: this.nc, js: this.js, os: this.os, jsm: this.jsm } = res);

// Restore from snapshot if exists
await restore(this.os, this.dbFile);
const restoreRes = await restore(this.os, this.dbFile);
if (!restoreRes) {
// Restore from external backup
if (externalBackup === "http") {
await httpRestore(this.dbFile, externalBackupUrl);
}
}

// Setup to the database
this.db = setupDb(this.dbFile);
Expand Down Expand Up @@ -285,16 +298,21 @@ export class Nqlite {
continue;
}

// Snapshot the database to object store
// Snapshot the database to object store and/or external storage
let seq = this.getSeq();
if (await snapshot(this.os, this.dbFile, seq)) {
// Purge previos messages from the stream older than seq - snapThreshold
if (await snapshot(this.os, this.dbFile)) {
// Purge previous messages from the stream older than seq - snapThreshold
seq = seq - this.snapThreshold;
await this.jsm.streams.purge(this.app, {
filter: this.subject,
seq: seq < 0 ? 0 : seq,
});
}

// Attempting to backup the database to external storage
if (this.externalBackup === "http") {
await httpBackup(this.dbFile, this.externalBackupUrl);
}
} catch (e) {
console.log("Error during snapshot polling:", e.message);
}
Expand Down
2 changes: 2 additions & 0 deletions types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ export type Options = {
creds: string;
token: string;
dataDir: string;
externalBackup: string;
externalBackupUrl: string;
};
53 changes: 44 additions & 9 deletions util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,17 @@ export function setupDb(file: string): Database {
return db;
}

export async function restore(os: ObjectStore, db: string): Promise<void> {
export async function restore(os: ObjectStore, db: string): Promise<boolean> {
// See if snapshot exists in object store
const o = await os.get("snapshot");

if (!o) {
console.log("No snapshot object to restore");
return;
return false;
}

console.log(
`Restoring from snapshot with seq ${o.info.description} taken: ${o.info.mtime}`,
`Restoring from snapshot taken: ${o.info.mtime}`,
);

// Get the object
Expand All @@ -127,6 +127,7 @@ export async function restore(os: ObjectStore, db: string): Promise<void> {
const mb = (o.info.size / 1024 / 1024).toFixed(2);

console.log(`Restored from snapshot: ${mb}Mb`);
return true;
}

async function fromReadableStream(
Expand Down Expand Up @@ -161,20 +162,19 @@ function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
export async function snapshot(
os: ObjectStore,
db: string,
seq: number,
): Promise<boolean> {
try {
// Put the sqlite file in the object store
const info = await os.put({
name: "snapshot",
description: `${seq}`,
}, readableStreamFrom(await Deno.readFile(db)));
const info = await os.put(
{ name: "snapshot" },
readableStreamFrom(Deno.readFileSync(db)),
);

// Convert bytes to megabytes
const mb = (info.size / 1024 / 1024).toFixed(2);

console.log(
`Snapshot with sequence number ${seq} stored in object store: ${mb}Mb`,
`Snapshot stored in object store: ${mb}Mb`,
);
return true;
} catch (e) {
Expand Down Expand Up @@ -233,6 +233,41 @@ export async function snapshotCheck(
return true;
}

export async function httpBackup(db: string, url: string): Promise<boolean> {
// Backup to HTTP using the fetch API
try {
const res = await fetch(url, {
method: "POST",
body: Deno.readFileSync(db),
});
console.log("HTTP backup response:", res.status, res.statusText);
if (res.status !== 200) return false;
const mb = (Deno.statSync(db).size / 1024 / 1024).toFixed(2);
console.log(`Snapshot stored via http: ${mb}Mb`);
return true;
} catch (e) {
console.log("Error during http backup:", e.message);
return false;
}
}

export async function httpRestore(db: string, url: string): Promise<boolean> {
// Restore from HTTP using the fetch API
try {
const res = await fetch(url);
console.log("HTTP restore response:", res.status, res.statusText);
if (res.status !== 200) return false;
const file = await Deno.open(db, { write: true, create: true });
await res.body?.pipeTo(file.writable);
const mb = (Deno.statSync(db).size / 1024 / 1024).toFixed(2);
console.log(`Restored from http snapshot: ${mb}Mb`);
return true;
} catch (e) {
console.log("Error during http restore:", e.message);
return false;
}
}

export async function sigHandler(
inSnap: boolean,
sub: JetStreamSubscription,
Expand Down

0 comments on commit 4150f99

Please sign in to comment.