Skip to content

Commit

Permalink
feat: make engine persist logs and status over multiple runs
Browse files Browse the repository at this point in the history
  • Loading branch information
LuisDuarte1 committed Nov 20, 2024
1 parent 7e6fed4 commit 9934afa
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 21 deletions.
6 changes: 6 additions & 0 deletions .changeset/cyan-geese-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@cloudflare/workflows-shared": minor
"miniflare": patch
---

Add proper engine persistance in .wrangler and fix multiple workflows in miniflare
2 changes: 1 addition & 1 deletion fixtures/workflow-multiple/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe("Workflows", () => {
}
}

it.only("creates two instances with same id in two different workflows", async ({
it("creates two instances with same id in two different workflows", async ({
expect,
}) => {
const createResult = {
Expand Down
6 changes: 6 additions & 0 deletions fixtures/workflow/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
Expand All @@ -10,6 +11,11 @@ describe("Workflows", () => {
getOutput: () => string;

beforeAll(async () => {
// delete previous run contents because of persistence
await rm(resolve(__dirname, "..") + "/.wrangler", {
force: true,
recursive: true,
});
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
[
Expand Down
69 changes: 49 additions & 20 deletions packages/workflows-shared/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ export type DatabaseInstance = {
ended_on: string | null;
};

const ENGINE_STATUS_KEY = "ENGINE_STATUS";

export class Engine extends DurableObject<Env> {
logs: Array<unknown> = [];
status: InstanceStatus = InstanceStatus.Queued;

isRunning: boolean = false;
accountId: number | undefined;
Expand All @@ -66,21 +67,32 @@ export class Engine extends DurableObject<Env> {

constructor(state: DurableObjectState, env: Env) {
super(state, env);

void this.ctx.blockConcurrencyWhile(async () => {
this.ctx.storage.transactionSync(() => {
this.ctx.storage.sql.exec(`
CREATE TABLE IF NOT EXISTS priority_queue (
id INTEGER PRIMARY KEY NOT NULL,
created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
target_timestamp INTEGER NOT NULL,
action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted),
entryType INTEGER NOT NULL,
hash TEXT NOT NULL,
CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1
UNIQUE (action, entryType, hash)
)
`);
try {
this.ctx.storage.sql.exec(`
CREATE TABLE IF NOT EXISTS priority_queue (
id INTEGER PRIMARY KEY NOT NULL,
created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
target_timestamp INTEGER NOT NULL,
action INTEGER NOT NULL, -- should only be 0 or 1 (1 for added, 0 for deleted),
entryType INTEGER NOT NULL,
hash TEXT NOT NULL,
CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1
UNIQUE (action, entryType, hash)
);
CREATE TABLE IF NOT EXISTS states (
id INTEGER PRIMARY KEY NOT NULL,
groupKey TEXT,
target TEXT,
metadata TEXT,
event INTEGER NOT NULL
)
`);
} catch (e) {
console.error(e);
throw e;
}
});
});

Expand All @@ -96,38 +108,55 @@ export class Engine extends DurableObject<Env> {
target: string | null = null,
metadata: Record<string, unknown>
) {
this.logs.push({
this.ctx.storage.sql.exec(
"INSERT INTO states (event, groupKey, target, metadata) VALUES (?, ?, ?, ?)",
event,
group,
target,
metadata,
});
JSON.stringify(metadata)
);
}

readLogsFromStep(_cacheKey: string): RawInstanceLog[] {
return [];
}

readLogs(): InstanceLogsResponse {
const logs = [
...this.ctx.storage.sql.exec<Record<string, string | number>>(
"SELECT event, groupKey, target, metadata FROM states"
),
];

return {
// @ts-expect-error TODO: Fix this
logs: this.logs,
logs: logs.map((log) => ({
...log,
metadata: JSON.parse(log.metadata as string),
group: log.groupKey,
})),
};
}

async getStatus(
_accountId: number,
_instanceId: string
): Promise<InstanceStatus> {
return this.status;
const res = await this.ctx.storage.get<InstanceStatus>(ENGINE_STATUS_KEY);

// NOTE(lduarte): if status don't exist, means that engine is running for the first time, so we assume queued
if (res === undefined) {
return InstanceStatus.Queued;
}
return res;
}

async setStatus(
accountId: number,
instanceId: string,
status: InstanceStatus
): Promise<void> {
this.status = status;
await this.ctx.storage.put(ENGINE_STATUS_KEY, status);
}

async abort(_reason: string) {
Expand Down

0 comments on commit 9934afa

Please sign in to comment.