Skip to content

Commit

Permalink
Fix multiple workflows and add proper engine persistance (#7286)
Browse files Browse the repository at this point in the history
* fix: allow multiple workflow definitions

* feat: make engine persist logs and status over multiple runs

* chore: add persistance test and remove redundant env types

* chore: change changeset and remove redundant test options
  • Loading branch information
LuisDuarte1 authored Nov 20, 2024
1 parent fa21312 commit 563439b
Show file tree
Hide file tree
Showing 14 changed files with 436 additions and 38 deletions.
6 changes: 6 additions & 0 deletions .changeset/cyan-geese-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@cloudflare/workflows-shared": minor
"miniflare": minor
---

Add proper engine persistance in .wrangler and fix multiple workflows in miniflare
17 changes: 17 additions & 0 deletions fixtures/workflow-multiple/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "my-workflow-multiple",
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"start": "wrangler dev",
"test:ci": "vitest"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20241106.0",
"undici": "catalog:default",
"wrangler": "workspace:*"
},
"volta": {
"extends": "../../package.json"
}
}
90 changes: 90 additions & 0 deletions fixtures/workflow-multiple/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {
WorkerEntrypoint,
WorkflowEntrypoint,
WorkflowEvent,
WorkflowStep,
} from "cloudflare:workers";

type Params = {
name: string;
};

export class Demo extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { timestamp, payload } = event;

await step.sleep("Wait", "1 second");

const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.sleep("Wait", "1 second");

const result2 = await step.do("Second step", async function () {
return {
output: "workflow1",
};
});

return [result, result2, timestamp, payload, "workflow1"];
}
}

export class Demo2 extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { timestamp, payload } = event;

await step.sleep("Wait", "1 second");

const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.sleep("Wait", "1 second");

const result2 = await step.do("Second step", async function () {
return {
output: "workflow2",
};
});

return [result, result2, timestamp, payload, "workflow2"];
}
}

type Env = {
WORKFLOW: Workflow;
WORKFLOW2: Workflow;
};

export default class extends WorkerEntrypoint<Env> {
async fetch(req: Request) {
const url = new URL(req.url);
const id = url.searchParams.get("id");
const workflowName = url.searchParams.get("workflowName");

if (url.pathname === "/favicon.ico") {
return new Response(null, { status: 404 });
}
let workflowToUse =
workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW;

let handle: WorkflowInstance;
if (url.pathname === "/create") {
if (id === null) {
handle = await workflowToUse.create();
} else {
handle = await workflowToUse.create({ id });
}
} else {
handle = await workflowToUse.get(id);
}

return Response.json({ status: await handle.status(), id: handle.id });
}
}
127 changes: 127 additions & 0 deletions fixtures/workflow-multiple/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";

describe("Workflows", () => {
let ip: string,
port: number,
stop: (() => Promise<unknown>) | undefined,
getOutput: () => string;

beforeAll(async () => {
// delete previous run contents because of persistence
await rm(resolve(__dirname, "..") + "/.wrangler", {
force: true,
recursive: true,
});
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
["--port=0", "--inspector-port=0"]
));
});

afterAll(async () => {
await stop?.();
});

async function fetchJson(url: string) {
const response = await fetch(url, {
headers: {
"MF-Disable-Pretty-Error": "1",
},
});
const text = await response.text();

try {
return JSON.parse(text);
} catch (err) {
throw new Error(`Couldn't parse JSON:\n\n${text}`);
}
}

it("creates two instances with same id in two different workflows", async ({
expect,
}) => {
const createResult = {
id: "test",
status: {
status: "running",
output: [],
},
};

await Promise.all([
expect(
fetchJson(`http://${ip}:${port}/create?workflowName=1&id=test`)
).resolves.toStrictEqual(createResult),
expect(
fetchJson(`http://${ip}:${port}/create?workflowName=2&id=test`)
).resolves.toStrictEqual(createResult),
]);

const firstResult = {
id: "test",
status: {
status: "running",
output: [{ output: "First step result" }],
},
};
await Promise.all([
vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`)
).resolves.toStrictEqual(firstResult);
},
{ timeout: 5000 }
),
vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`)
).resolves.toStrictEqual(firstResult);
},
{ timeout: 5000 }
),
]);

await Promise.all([
await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`)
).resolves.toStrictEqual({
id: "test",
status: {
status: "complete",
output: [
{ output: "First step result" },
{ output: "workflow1" },
],
},
});
},
{ timeout: 5000 }
),
await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`)
).resolves.toStrictEqual({
id: "test",
status: {
status: "complete",
output: [
{ output: "First step result" },
{ output: "workflow2" },
],
},
});
},
{ timeout: 5000 }
),
]);
});
});
7 changes: 7 additions & 0 deletions fixtures/workflow-multiple/tests/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "@cloudflare/workers-tsconfig/tsconfig.json",
"compilerOptions": {
"types": ["node"]
},
"include": ["**/*.ts", "../../../node-types.d.ts"]
}
13 changes: 13 additions & 0 deletions fixtures/workflow-multiple/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "CommonJS",
"lib": ["ES2020"],
"types": ["@cloudflare/workers-types"],
"moduleResolution": "node",
"noEmit": true,
"skipLibCheck": true
},
"include": ["**/*.ts"],
"exclude": ["tests"]
}
9 changes: 9 additions & 0 deletions fixtures/workflow-multiple/vitest.config.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { defineProject, mergeConfig } from "vitest/config";
import configShared from "../../vitest.shared";

export default mergeConfig(
configShared,
defineProject({
test: {},
})
);
14 changes: 14 additions & 0 deletions fixtures/workflow-multiple/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#:schema node_modules/wrangler/config-schema.json
name = "my-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-10-22"

[[workflows]]
binding = "WORKFLOW"
name = "my-workflow"
class_name = "Demo"

[[workflows]]
binding = "WORKFLOW2"
name = "my-workflow-2"
class_name = "Demo2"
13 changes: 7 additions & 6 deletions fixtures/workflow/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
Expand All @@ -10,14 +11,14 @@ describe("Workflows", () => {
getOutput: () => string;

beforeAll(async () => {
// delete previous run contents because of persistence
await rm(resolve(__dirname, "..") + "/.wrangler", {
force: true,
recursive: true,
});
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
[
"--port=0",
"--inspector-port=0",
"--upstream-protocol=https",
"--host=prod.example.org",
]
["--port=0", "--inspector-port=0"]
));
});

Expand Down
5 changes: 0 additions & 5 deletions fixtures/workflow/worker-configuration.d.ts

This file was deleted.

20 changes: 13 additions & 7 deletions packages/miniflare/src/plugins/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,20 @@ export const WORKFLOWS_PLUGIN: Plugin<
sharedOptions.workflowsPersist
);
await fs.mkdir(persistPath, { recursive: true });
const storageService: Service = {
name: WORKFLOWS_STORAGE_SERVICE_NAME,
// each workflow should get its own storage service
const storageServices: Service[] = Object.entries(
options.workflows ?? {}
).map<Service>(([_, workflow]) => ({
name: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`,
disk: { path: persistPath, writable: true },
};
}));

// this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later
const services = Object.entries(options.workflows ?? {}).map<Service>(
([_bindingName, workflow]) => {
const uniqueKey = `miniflare-workflows`;
// NOTE(lduarte): the engine unique namespace key must be unique per workflow definition
// otherwise workerd will crash because there's two equal DO namespaces
const uniqueKey = `miniflare-workflows-${workflow.name}`;

const workflowsBinding: Service = {
name: `${WORKFLOWS_PLUGIN_NAME}:${workflow.name}`,
Expand All @@ -90,8 +95,9 @@ export const WORKFLOWS_PLUGIN: Plugin<
preventEviction: true,
},
],
// this might conflict between workflows
durableObjectStorage: { localDisk: WORKFLOWS_STORAGE_SERVICE_NAME },
durableObjectStorage: {
localDisk: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`,
},
bindings: [
{
name: "ENGINE",
Expand All @@ -116,7 +122,7 @@ export const WORKFLOWS_PLUGIN: Plugin<
return [];
}

return [storageService, ...services];
return [...storageServices, ...services];
},

getPersistPath({ workflowsPersist }, tmpPath) {
Expand Down
Loading

0 comments on commit 563439b

Please sign in to comment.