Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple workflows and add proper engine persistance #7286

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cyan-geese-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@cloudflare/workflows-shared": minor
"miniflare": minor
---

Add proper engine persistance in .wrangler and fix multiple workflows in miniflare
17 changes: 17 additions & 0 deletions fixtures/workflow-multiple/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "my-workflow-multiple",
"private": true,
"scripts": {
"deploy": "wrangler deploy",
"start": "wrangler dev",
"test:ci": "vitest"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20241106.0",
"undici": "catalog:default",
"wrangler": "workspace:*"
},
"volta": {
"extends": "../../package.json"
}
}
90 changes: 90 additions & 0 deletions fixtures/workflow-multiple/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import {
WorkerEntrypoint,
WorkflowEntrypoint,
WorkflowEvent,
WorkflowStep,
} from "cloudflare:workers";

type Params = {
name: string;
};

export class Demo extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { timestamp, payload } = event;

await step.sleep("Wait", "1 second");

const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.sleep("Wait", "1 second");

const result2 = await step.do("Second step", async function () {
return {
output: "workflow1",
};
});

return [result, result2, timestamp, payload, "workflow1"];
}
}

export class Demo2 extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const { timestamp, payload } = event;

await step.sleep("Wait", "1 second");

const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.sleep("Wait", "1 second");

const result2 = await step.do("Second step", async function () {
return {
output: "workflow2",
};
});

return [result, result2, timestamp, payload, "workflow2"];
}
}

type Env = {
WORKFLOW: Workflow;
WORKFLOW2: Workflow;
};

export default class extends WorkerEntrypoint<Env> {
async fetch(req: Request) {
const url = new URL(req.url);
const id = url.searchParams.get("id");
const workflowName = url.searchParams.get("workflowName");

if (url.pathname === "/favicon.ico") {
return new Response(null, { status: 404 });
}
let workflowToUse =
workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW;

let handle: WorkflowInstance;
if (url.pathname === "/create") {
if (id === null) {
handle = await workflowToUse.create();
} else {
handle = await workflowToUse.create({ id });
}
} else {
handle = await workflowToUse.get(id);
}

return Response.json({ status: await handle.status(), id: handle.id });
}
}
127 changes: 127 additions & 0 deletions fixtures/workflow-multiple/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";

describe("Workflows", () => {
let ip: string,
port: number,
stop: (() => Promise<unknown>) | undefined,
getOutput: () => string;

beforeAll(async () => {
// delete previous run contents because of persistence
await rm(resolve(__dirname, "..") + "/.wrangler", {
LuisDuarte1 marked this conversation as resolved.
Show resolved Hide resolved
force: true,
recursive: true,
});
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
["--port=0", "--inspector-port=0"]
));
});

afterAll(async () => {
await stop?.();
});

async function fetchJson(url: string) {
const response = await fetch(url, {
headers: {
"MF-Disable-Pretty-Error": "1",
},
});
const text = await response.text();

try {
return JSON.parse(text);
} catch (err) {
throw new Error(`Couldn't parse JSON:\n\n${text}`);
}
}

it("creates two instances with same id in two different workflows", async ({
expect,
}) => {
const createResult = {
id: "test",
status: {
status: "running",
output: [],
},
};

await Promise.all([
expect(
fetchJson(`http://${ip}:${port}/create?workflowName=1&id=test`)
).resolves.toStrictEqual(createResult),
expect(
fetchJson(`http://${ip}:${port}/create?workflowName=2&id=test`)
).resolves.toStrictEqual(createResult),
]);

const firstResult = {
id: "test",
status: {
status: "running",
output: [{ output: "First step result" }],
},
};
await Promise.all([
vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`)
).resolves.toStrictEqual(firstResult);
},
{ timeout: 5000 }
),
vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`)
).resolves.toStrictEqual(firstResult);
},
{ timeout: 5000 }
),
]);

await Promise.all([
await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=1&id=test`)
).resolves.toStrictEqual({
id: "test",
status: {
status: "complete",
output: [
{ output: "First step result" },
{ output: "workflow1" },
],
},
});
},
{ timeout: 5000 }
),
await vi.waitFor(
async () => {
await expect(
fetchJson(`http://${ip}:${port}/status?workflowName=2&id=test`)
).resolves.toStrictEqual({
id: "test",
status: {
status: "complete",
output: [
{ output: "First step result" },
{ output: "workflow2" },
],
},
});
},
{ timeout: 5000 }
),
]);
});
});
7 changes: 7 additions & 0 deletions fixtures/workflow-multiple/tests/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"extends": "@cloudflare/workers-tsconfig/tsconfig.json",
"compilerOptions": {
"types": ["node"]
},
"include": ["**/*.ts", "../../../node-types.d.ts"]
}
13 changes: 13 additions & 0 deletions fixtures/workflow-multiple/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "CommonJS",
"lib": ["ES2020"],
"types": ["@cloudflare/workers-types"],
"moduleResolution": "node",
"noEmit": true,
"skipLibCheck": true
},
"include": ["**/*.ts"],
"exclude": ["tests"]
}
9 changes: 9 additions & 0 deletions fixtures/workflow-multiple/vitest.config.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { defineProject, mergeConfig } from "vitest/config";
import configShared from "../../vitest.shared";

export default mergeConfig(
configShared,
defineProject({
test: {},
})
);
14 changes: 14 additions & 0 deletions fixtures/workflow-multiple/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#:schema node_modules/wrangler/config-schema.json
name = "my-workflow-demo"
main = "src/index.ts"
compatibility_date = "2024-10-22"

[[workflows]]
binding = "WORKFLOW"
name = "my-workflow"
class_name = "Demo"

[[workflows]]
binding = "WORKFLOW2"
name = "my-workflow-2"
class_name = "Demo2"
13 changes: 7 additions & 6 deletions fixtures/workflow/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
Expand All @@ -10,14 +11,14 @@ describe("Workflows", () => {
getOutput: () => string;

beforeAll(async () => {
// delete previous run contents because of persistence
await rm(resolve(__dirname, "..") + "/.wrangler", {
force: true,
recursive: true,
});
({ ip, port, stop, getOutput } = await runWranglerDev(
resolve(__dirname, ".."),
[
"--port=0",
"--inspector-port=0",
"--upstream-protocol=https",
"--host=prod.example.org",
]
["--port=0", "--inspector-port=0"]
));
});

Expand Down
5 changes: 0 additions & 5 deletions fixtures/workflow/worker-configuration.d.ts

This file was deleted.

20 changes: 13 additions & 7 deletions packages/miniflare/src/plugins/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,20 @@ export const WORKFLOWS_PLUGIN: Plugin<
sharedOptions.workflowsPersist
);
await fs.mkdir(persistPath, { recursive: true });
const storageService: Service = {
name: WORKFLOWS_STORAGE_SERVICE_NAME,
// each workflow should get its own storage service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sanity check for me: why does each workflow need its own storage service? as far as i can tell all other plugins use one storage service for all instances.

Copy link
Contributor Author

@LuisDuarte1 LuisDuarte1 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was personal preference mostly - this way each workflow gets its own folder in .wrangler/workflow and makes it way easier to debug the engine state

const storageServices: Service[] = Object.entries(
options.workflows ?? {}
).map<Service>(([_, workflow]) => ({
name: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`,
disk: { path: persistPath, writable: true },
};
}));

// this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later
const services = Object.entries(options.workflows ?? {}).map<Service>(
([_bindingName, workflow]) => {
const uniqueKey = `miniflare-workflows`;
// NOTE(lduarte): the engine unique namespace key must be unique per workflow definition
// otherwise workerd will crash because there's two equal DO namespaces
const uniqueKey = `miniflare-workflows-${workflow.name}`;
LuisDuarte1 marked this conversation as resolved.
Show resolved Hide resolved

const workflowsBinding: Service = {
name: `${WORKFLOWS_PLUGIN_NAME}:${workflow.name}`,
Expand All @@ -90,8 +95,9 @@ export const WORKFLOWS_PLUGIN: Plugin<
preventEviction: true,
},
],
// this might conflict between workflows
durableObjectStorage: { localDisk: WORKFLOWS_STORAGE_SERVICE_NAME },
durableObjectStorage: {
localDisk: `${WORKFLOWS_STORAGE_SERVICE_NAME}-${workflow.name}`,
},
bindings: [
{
name: "ENGINE",
Expand All @@ -116,7 +122,7 @@ export const WORKFLOWS_PLUGIN: Plugin<
return [];
}

return [storageService, ...services];
return [...storageServices, ...services];
},

getPersistPath({ workflowsPersist }, tmpPath) {
Expand Down
Loading
Loading