Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Workflow binding module #2677

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/cloudflare/internal/test/workflows/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
load("//:build/wd_test.bzl", "wd_test")

wd_test(
src = "workflows-api-test.wd-test",
args = ["--experimental"],
data = glob(["*.js"]),
)
21 changes: 21 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-api-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import * as assert from 'node:assert';

export const tests = {
async test(_, env) {
{
// Test create instance
const instance = await env.workflow.create('foo', { bar: 'baz' });
assert.deepStrictEqual(instance.id, 'foo');
}

{
// Test get instance
const instance = await env.workflow.get('bar');
assert.deepStrictEqual(instance.id, 'bar');
}
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "workflows-api-test",
worker = (
modules = [
(name = "worker", esModule = embed "workflows-api-test.js")
],
compatibilityDate = "2024-09-02",
compatibilityFlags = ["experimental", "nodejs_compat"],
bindings = [
(
name = "workflow",
wrapped = (
moduleName = "cloudflare-internal:workflows-api",
innerBindings = [(
name = "fetcher",
service = "workflows-mock"
)],
)
)
],
)
),
( name = "workflows-mock",
worker = (
compatibilityDate = "2024-09-02",
compatibilityFlags = ["experimental", "nodejs_compat"],
modules = [
(name = "worker", esModule = embed "workflows-mock.js")
],
)
)
]
);
41 changes: 41 additions & 0 deletions src/cloudflare/internal/test/workflows/workflows-mock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export default {
async fetch(request, env, ctx) {
const data = await request.json();

if (request.url.includes('/get') && request.method === 'POST') {
return Response.json(
{
result: {
instanceId: data.id,
},
},
{
status: 200,
headers: {
'content-type': 'application/json',
},
}
);
}

if (request.url.includes('/create') && request.method === 'POST') {
return Response.json(
{
result: {
instanceId: data.id,
},
},
{
status: 201,
headers: {
'content-type': 'application/json',
},
}
);
}
},
};
117 changes: 113 additions & 4 deletions src/cloudflare/internal/workflows-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,121 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export class NonRetryableError extends Error {
// `__brand` is needed for Workflows' engine to validate if the user returned a NonRetryableError
// this provides better DX because they can just extend NonRetryableError for their own Errors
// and override name.
// This needs to be a public field because it's serialized over RPC to the Workflows' engine
// `__brand` is how engine validates that the user returned a `NonRetryableError`
// imported from "cloudflare:workflows"
// This enables them to extend NonRetryableError for their own Errors
// as well by overriding name
// Private fields are not serialized over RPC
public readonly __brand: string = 'NonRetryableError';

public constructor(message: string, name = 'NonRetryableError') {
super(message);
this.name = name;
}
}

interface Fetcher {
fetch: typeof fetch;
}

async function callFetcher<T>(
fetcher: Fetcher,
path: string,
body: object
): Promise<T> {
const res = await fetcher.fetch(`http://workflow-binding.local${path}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Version': '1',
},
body: JSON.stringify(body),
});

const response = (await res.json()) as {
result: T;
error?: WorkflowError;
};

if (res.ok) {
return response.result;
} else {
throw new Error(response.error?.message);
}
}

class InstanceImpl implements Instance {
private readonly fetcher: Fetcher;
public readonly id: string;

public constructor(id: string, fetcher: Fetcher) {
this.id = id;
this.fetcher = fetcher;
}

public async pause(): Promise<void> {
await callFetcher(this.fetcher, '/pause', {
id: this.id,
});
}
public async resume(): Promise<void> {
await callFetcher(this.fetcher, '/resume', {
id: this.id,
});
}

public async abort(): Promise<void> {
await callFetcher(this.fetcher, '/abort', {
id: this.id,
});
}

public async restart(): Promise<void> {
await callFetcher(this.fetcher, '/restart', {
id: this.id,
});
}

public async status(): Promise<InstanceStatus> {
const result = await callFetcher<InstanceStatus>(this.fetcher, '/status', {
id: this.id,
});
return result;
}
}

class WorkflowImpl {
private readonly fetcher: Fetcher;

public constructor(fetcher: Fetcher) {
this.fetcher = fetcher;
}

public async get(id: string): Promise<Instance> {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/get',
{ id }
);

return new InstanceImpl(result.instanceId, this.fetcher);
}

public async create(id: string, params: object): Promise<Instance> {
const result = await callFetcher<{ instanceId: string }>(
this.fetcher,
'/create',
{ id, params }
);

return new InstanceImpl(result.instanceId, this.fetcher);
}
}

export function makeBinding(env: { fetcher: Fetcher }): Workflow {
return new WorkflowImpl(env.fetcher);
}

export default makeBinding;
87 changes: 87 additions & 0 deletions src/cloudflare/internal/workflows.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) 2022-2023 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

/*****************************
*
* NOTE: this is copy & pasted from the types/ folder, as when bazel
* runs it doesn't have access to that directly and thusly is sad.
* TODO: come up with a better system for this.
*
****************************** /

/**
* NonRetryableError allows for a user to throw a fatal error
* that makes a Workflow instance fail immediately without triggering a retry
*/
declare abstract class NonRetryableError extends Error {
/**
* `__brand` is used to differentiate between `NonRetryableError` and `Error`
* and is omitted from the constructor because users should not set it
*/
public constructor(message: string, name?: string);
}

declare abstract class Workflow {
/**
* Get a handle to an existing instance of the Workflow.
* @param id Id for the instance of this Workflow
* @returns A promise that resolves with a handle for the Instance
*/
public get(id: string): Promise<Instance>;

/**
* Create a new instance and return a handle to it. If a provided id exists, an error will be thrown.
* @param id Id to create the instance of this Workflow with
* @param params The payload to send over to this instance
* @returns A promise that resolves with a handle for the Instance
*/
public create(id: string, params: object): Promise<Instance>;
}

type InstanceStatus = {
status:
| 'queued'
| 'running'
| 'paused'
| 'errored'
| 'terminated'
| 'complete'
| 'unknown';
error?: string;
output?: object;
};

interface WorkflowError {
code?: number;
message: string;
}

declare abstract class Instance {
public id: string;

/**
* Pause the instance.
*/
public pause(): Promise<void>;

/**
* Resume the instance. If it is already running, an error will be thrown.
*/
public resume(): Promise<void>;

/**
* Abort the instance. If it is errored, terminated or complete, an error will be thrown.
*/
public abort(): Promise<void>;

/**
* Restart the instance.
*/
public restart(): Promise<void>;

/**
* Returns the current status of the instance.
*/
public status(): Promise<InstanceStatus>;
}
4 changes: 4 additions & 0 deletions src/cloudflare/workflows.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
// Copyright (c) 2024 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

export { NonRetryableError } from 'cloudflare-internal:workflows-api';
Loading
Loading