Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Queues - Typescript #632

Merged
merged 30 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
876a94a
Start sysdb
chuck-dbos Sep 24, 2024
78041e7
Sys db attempt
chuck-dbos Sep 24, 2024
1190813
Fixes for sysdb
chuck-dbos Sep 24, 2024
d525bf0
Stub of WF Q
chuck-dbos Sep 24, 2024
e4eaeaa
Probably better as a start_workflow
chuck-dbos Sep 24, 2024
8016e1b
Pass q name through
chuck-dbos Sep 24, 2024
988aba8
Exec thread
chuck-dbos Sep 25, 2024
b997205
Dequeues
chuck-dbos Sep 25, 2024
ad8b572
Exec flag
chuck-dbos Sep 25, 2024
0ee6762
take out
chuck-dbos Sep 25, 2024
e7d9917
Start Q
chuck-dbos Sep 26, 2024
462f77b
Not going
chuck-dbos Sep 26, 2024
f51ed91
Merge remote-tracking branch 'origin/main' into chuck/wfq
chuck-dbos Sep 26, 2024
0113b44
Fix test wf
chuck-dbos Sep 26, 2024
adb1252
Test serial Q
chuck-dbos Sep 26, 2024
f49375f
Child WF test
chuck-dbos Sep 26, 2024
18db225
Start moving rate limit over
chuck-dbos Sep 27, 2024
ce51115
Pass in q description
chuck-dbos Sep 27, 2024
e8d8ee0
Put inside
chuck-dbos Sep 27, 2024
44eea82
Merge remote-tracking branch 'origin/main' into chuck/wfq
chuck-dbos Sep 27, 2024
584fc20
Removal
chuck-dbos Sep 27, 2024
55df0cc
Make tx
chuck-dbos Sep 28, 2024
ab073f6
Fix cols
chuck-dbos Sep 28, 2024
d046934
Port of rate limiting
chuck-dbos Sep 30, 2024
a987ba1
Add empty q check
chuck-dbos Sep 30, 2024
f33e657
Add rate limit simplest test
chuck-dbos Sep 30, 2024
29810a4
WF queue limited tests
chuck-dbos Sep 30, 2024
b6131e5
Merge remote-tracking branch 'origin/main' into chuck/wfq
chuck-dbos Oct 1, 2024
551fb8a
First part
chuck-dbos Oct 1, 2024
cfdbf57
Add test
chuck-dbos Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions migrations/20240924000000_workflowqueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
exports.up = function(knex) {
return knex.schema.withSchema('dbos')
.table('workflow_status', function(table) {
table.text('queue_name')
.defaultTo(null);
})
.createTable('workflow_queue', function(table) {
table.text('queue_name').notNullable();
table.text('workflow_uuid').notNullable();
table.bigInteger('created_at_epoch_ms').notNullable().defaultTo(knex.raw('(EXTRACT(EPOCH FROM now())*1000)::bigint'));
table.bigInteger('started_at_epoch_ms');
table.bigInteger('completed_at_epoch_ms');
table.primary(['workflow_uuid']);
table.foreign('workflow_uuid').references('workflow_uuid').inTable('dbos.workflow_status').onDelete('CASCADE').onUpdate('CASCADE');
})
};

exports.down = function(knex) {
return knex.schema.withSchema('dbos')
.table('workflow_status', function(table) {
table.dropColumn('queue_name');
})
.dropTableIfExists('workflow_queue');
};
2 changes: 1 addition & 1 deletion packages/dbos-cloud/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ workflowCommands
.option('-U, --user <string>', 'Retrieve workflows run by this user')
.option('-s, --start-time <string>', 'Retrieve workflows starting after this timestamp (ISO 8601 format)')
.option('-e, --end-time <string>', 'Retrieve workflows starting before this timestamp (ISO 8601 format)')
.option('-S, --status <string>', 'Retrieve workflows with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, or CANCELLED)')
.option('-S, --status <string>', 'Retrieve workflows with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, ENQUEUED, or CANCELLED)')
.option('-v, --application-version <string>', 'Retrieve workflows with this application version')
.action(async (appName: string | undefined, options: { limit?: string, appDir?: string, user?: string, startTime?: string, endTime?: string, status?: string, applicationVersion?: string, workflowUUIDs?: string[], offset?: string }) => {
const input: ListWorkflowsInput = {
Expand Down
9 changes: 9 additions & 0 deletions schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface workflow_status {
request: string; // Serialized HTTPRequest
executor_id: string; // Set to "local" for local deployment, set to microVM ID for cloud deployment.
application_version: string;
queue_name?: string;
}

export interface notifications {
Expand Down Expand Up @@ -42,3 +43,11 @@ export interface scheduler_state {
workflow_fn_name: string;
last_run_time: number; // Time that has certainly been kicked off; others may have but OAOO will cover that
}

export interface workflow_queue {
workflow_uuid: string;
queue_name: string;
created_at_epoch_ms: number; // This time is provided by the database
started_at_epoch_ms?: number; // This time is provided by the client
completed_at_epoch_ms?: number; // This time is provided by the client
}
68 changes: 49 additions & 19 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ import { DBOSJSON, sleepms } from './utils';
import path from 'node:path';
import { StoredProcedure, StoredProcedureConfig } from './procedure';
import { NoticeMessage } from "pg-protocol/dist/messages";
import { DBOSEventReceiver, DBOSExecutorContext } from ".";
import { DBOSEventReceiver, DBOSExecutorContext} from ".";

import { get } from "lodash";
import { wfQueueRunner, WorkflowQueue } from "./wfqueue";

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DBOSNull { }
Expand Down Expand Up @@ -643,10 +644,11 @@ export class DBOSExecutor implements DBOSExecutorContext {

const internalStatus: WorkflowStatusInternal = {
workflowUUID: workflowUUID,
status: StatusString.PENDING,
status: (params.queueName !== undefined) ? StatusString.ENQUEUED : StatusString.PENDING,
name: wf.name,
className: wCtxt.isTempWorkflow ? "" : getRegisteredMethodClassName(wf),
configName: params.configuredInstance?.name || "",
queueName: params.queueName,
authenticatedUser: wCtxt.authenticatedUser,
output: undefined,
error: "",
Expand All @@ -668,9 +670,11 @@ export class DBOSExecutor implements DBOSExecutorContext {

// Synchronously set the workflow's status to PENDING and record workflow inputs (for non single-transaction workflows).
// We have to do it for all types of workflows because operation_outputs table has a foreign key constraint on workflow status table.
if (wCtxt.tempWfOperationType !== TempWorkflowType.transaction
&& wCtxt.tempWfOperationType !== TempWorkflowType.procedure
if ((wCtxt.tempWfOperationType !== TempWorkflowType.transaction
&& wCtxt.tempWfOperationType !== TempWorkflowType.procedure)
|| params.queueName !== undefined
) {
// TODO: Make this transactional (and with the queue step below)
args = await this.systemDatabase.initWorkflowStatus(internalStatus, args);
}

Expand All @@ -682,6 +686,12 @@ export class DBOSExecutor implements DBOSExecutorContext {
result = await wf.call(params.configuredInstance, wCtxt, ...args);
internalStatus.output = result;
internalStatus.status = StatusString.SUCCESS;
if (internalStatus.queueName) {
// Now... the workflow isn't certainly done.
// But waiting this long is for concurrency control anyway,
// so it is probably done enough.
await this.systemDatabase.dequeueWorkflow(workflowUUID, this.#getQueueByName(internalStatus.queueName));
}
this.systemDatabase.bufferWorkflowOutput(workflowUUID, internalStatus);
wCtxt.span.setStatus({ code: SpanStatusCode.OK });
} catch (err) {
Expand All @@ -701,6 +711,9 @@ export class DBOSExecutor implements DBOSExecutorContext {
}
internalStatus.error = DBOSJSON.stringify(serializeError(e));
internalStatus.status = StatusString.ERROR;
if (internalStatus.queueName) {
await this.systemDatabase.dequeueWorkflow(workflowUUID, this.#getQueueByName(internalStatus.queueName));
}
await this.systemDatabase.recordWorkflowError(workflowUUID, internalStatus);
// TODO: Log errors, but not in the tests when they're expected.
wCtxt.span.setStatus({ code: SpanStatusCode.ERROR, message: e.message });
Expand All @@ -722,21 +735,34 @@ export class DBOSExecutor implements DBOSExecutorContext {
}
return result;
};
const workflowPromise: Promise<R> = runWorkflow();

// Need to await for the workflow and capture errors.
const awaitWorkflowPromise = workflowPromise
.catch((error) => {
this.logger.debug("Captured error in awaitWorkflowPromise: " + error);
})
.finally(() => {
// Remove itself from pending workflow map.
this.pendingWorkflowMap.delete(workflowUUID);
});
this.pendingWorkflowMap.set(workflowUUID, awaitWorkflowPromise);

// Return the normal handle that doesn't capture errors.
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
if (params.queueName === undefined || params.executeWorkflow) {
const workflowPromise: Promise<R> = runWorkflow();

// Need to await for the workflow and capture errors.
const awaitWorkflowPromise = workflowPromise
.catch((error) => {
this.logger.debug("Captured error in awaitWorkflowPromise: " + error);
})
.finally(() => {
// Remove itself from pending workflow map.
this.pendingWorkflowMap.delete(workflowUUID);
});
this.pendingWorkflowMap.set(workflowUUID, awaitWorkflowPromise);

// Return the normal handle that doesn't capture errors.
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
}
else {
await this.systemDatabase.enqueueWorkflow(workflowUUID, this.#getQueueByName(params.queueName));
return new RetrievedHandle(this.systemDatabase, workflowUUID, callerUUID, callerFunctionID);
}
}

#getQueueByName(name: string): WorkflowQueue {
const q = wfQueueRunner.wfQueuesByName.get(name);
if (!q) throw new DBOSNotRegisteredError(`Workflow queue '${name}' does is not defined.`);
return q;
}

/**
Expand Down Expand Up @@ -915,8 +941,12 @@ export class DBOSExecutor implements DBOSExecutorContext {
const workflowStartUUID = startNewWorkflow ? undefined : workflowUUID;

if (wfInfo) {
return this.workflow(wfInfo.workflow, {
workflowUUID: workflowStartUUID, parentCtx: parentCtx, configuredInstance: configuredInst, recovery: true,
queueName: wfStatus.queueName, executeWorkflow: true
},
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(wfInfo.workflow, { workflowUUID: workflowStartUUID, parentCtx: parentCtx, configuredInstance: configuredInst, recovery: true }, ...inputs);
...inputs);
}

// Should be temporary workflows. Parse the name of the workflow.
Expand Down
2 changes: 1 addition & 1 deletion src/dbos-runtime/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ workflowCommands
.option('-u, --user <string>', 'Retrieve workflows run by this user')
.option('-s, --start-time <string>', 'Retrieve workflows starting after this timestamp (ISO 8601 format)')
.option('-e, --end-time <string>', 'Retrieve workflows starting before this timestamp (ISO 8601 format)')
.option('-S, --status <string>', 'Retrieve workflows with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, or CANCELLED)')
.option('-S, --status <string>', 'Retrieve workflows with this status (PENDING, SUCCESS, ERROR, RETRIES_EXCEEDED, ENQUEUED, or CANCELLED)')
.option('-v, --application-version <string>', 'Retrieve workflows with this application version')
.option('--request', 'Retrieve workflow request information')
.option("-d, --appDir <string>", "Specify the application root directory")
Expand Down
7 changes: 7 additions & 0 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import path from 'node:path';
import { Server } from 'http';
import { pathToFileURL } from 'url';
import { DBOSScheduler } from '../scheduler/scheduler';
import { wfQueueRunner } from '../wfqueue';
import { GlobalLogger } from '../telemetry/logs';

interface ModuleExports {
Expand All @@ -30,6 +31,7 @@ export class DBOSRuntime {
private dbosExec: DBOSExecutor | null = null;
private servers: { appServer: Server; adminServer: Server } | undefined;
private scheduler: DBOSScheduler | null = null;
private wfQueueRunner: Promise<void> | null = null;

constructor(dbosConfig: DBOSConfig, private readonly runtimeConfig: DBOSRuntimeConfig) {
// Initialize workflow executor.
Expand All @@ -54,6 +56,9 @@ export class DBOSRuntime {
this.scheduler = new DBOSScheduler(this.dbosExec);
this.scheduler.initScheduler();
this.scheduler.logRegisteredSchedulerEndpoints();
wfQueueRunner.logRegisteredEndpoints(this.dbosExec);
this.wfQueueRunner = wfQueueRunner.dispatchLoop(this.dbosExec);

for (const evtRcvr of this.dbosExec.eventReceivers) {
await evtRcvr.initialize(this.dbosExec);
}
Expand Down Expand Up @@ -114,6 +119,8 @@ export class DBOSRuntime {
*/
async destroy() {
await this.scheduler?.destroyScheduler();
wfQueueRunner.stop();
await this.wfQueueRunner;
for (const evtRcvr of this.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
}
Expand Down
11 changes: 6 additions & 5 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { DBOSJSON } from "../utils";
import { StoredProcedure, StoredProcedureContextImpl } from "../procedure";
import { PoolClient } from "pg";
import assert from "node:assert";
import { WorkflowQueue } from "../wfqueue";

interface RecordedResult<R> {
output: R;
Expand Down Expand Up @@ -302,15 +303,15 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
* Generate a proxy object for the provided class that wraps direct calls (i.e. OpClass.someMethod(param))
* to use WorkflowContext.Transaction(OpClass.someMethod, param);
*/
proxyInvokeWF<T extends object>(object: T, workflowUUID: string | undefined, asyncWf: boolean, configuredInstance: ConfiguredInstance | null):
proxyInvokeWF<T extends object>(object: T, workflowUUID: string | undefined, asyncWf: boolean, configuredInstance: ConfiguredInstance | null, queue?: WorkflowQueue):
WfInvokeWfsAsync<T> {
const ops = getRegisteredOperations(object);
const proxy: Record<string, unknown> = {};

const funcId = this.functionIDGetIncrement();
const childUUID = workflowUUID || (this.workflowUUID + "-" + funcId);

const params = { workflowUUID: childUUID, parentCtx: this, configuredInstance };
const params = { workflowUUID: childUUID, parentCtx: this, configuredInstance, queueName: queue?.name };


for (const op of ops) {
Expand All @@ -328,12 +329,12 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return proxy as WfInvokeWfsAsync<T>;
}

startWorkflow<T extends object>(target: T, workflowUUID?: string): WfInvokeWfsAsync<T> {
startWorkflow<T extends object>(target: T, workflowUUID?: string, queue?: WorkflowQueue): WfInvokeWfsAsync<T> {
if (typeof target === 'function') {
return this.proxyInvokeWF(target, workflowUUID, true, null) as unknown as WfInvokeWfsAsync<T>;
return this.proxyInvokeWF(target, workflowUUID, true, null, queue) as unknown as WfInvokeWfsAsync<T>;
}
else {
return this.proxyInvokeWF(target, workflowUUID, true, target as ConfiguredInstance) as unknown as WfInvokeWfsAsync<T>;
return this.proxyInvokeWF(target, workflowUUID, true, target as ConfiguredInstance, queue) as unknown as WfInvokeWfsAsync<T>;
}
}
invokeWorkflow<T extends object>(target: T, workflowUUID?: string): WfInvokeWfs<T> {
Expand Down
11 changes: 11 additions & 0 deletions src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as fdb from "foundationdb";
import { DBOSWorkflowConflictUUIDError } from "../error";
import { NativeValue } from "foundationdb/dist/lib/native";
import { DBOSJSON, sleepms } from "../utils";
import { WorkflowQueue } from "../wfqueue";

interface OperationOutput<R> {
output: R;
Expand Down Expand Up @@ -410,4 +411,14 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
getWorkflows(_input: GetWorkflowsInput): Promise<GetWorkflowsOutput> {
throw new Error("Method not implemented.");
}

enqueueWorkflow(_workflowId: string, _queue: WorkflowQueue): Promise<void> {
throw new Error("Method not implemented.");
}
dequeueWorkflow(_workflowId: string, _queue: WorkflowQueue): Promise<void> {
throw new Error("Method not implemented.");
}
findAndMarkStartableWorkflows(_queue: WorkflowQueue): Promise<string[]> {
throw new Error("Method not implemented.");
}
}
26 changes: 15 additions & 11 deletions src/httpServer/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { v4 as uuidv4 } from 'uuid';
import { Communicator } from "../communicator";
import { APITypes, ArgSources } from "./handlerTypes";
import { StoredProcedure } from "../procedure";
import { WorkflowQueue } from "../wfqueue";

// local type declarations for workflow functions
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -52,8 +53,9 @@ export interface HandlerContext extends DBOSContext {
invoke<T extends object>(targetClass: T, workflowUUID?: string): InvokeFuncs<T>;
invokeWorkflow<T extends ConfiguredInstance>(targetCfg: T, workflowUUID?: string): SyncHandlerWfFuncsInst<T>;
invokeWorkflow<T extends object>(targetClass: T, workflowUUID?: string): SyncHandlerWfFuncs<T>;
startWorkflow<T extends ConfiguredInstance>(targetCfg: T, workflowUUID?: string): AsyncHandlerWfFuncInst<T>;
startWorkflow<T extends object>(targetClass: T, workflowUUID?: string): AsyncHandlerWfFuncs<T>;
startWorkflow<T extends ConfiguredInstance>(targetCfg: T, workflowUUID?: string, queue?: WorkflowQueue): AsyncHandlerWfFuncInst<T>;
startWorkflow<T extends object>(targetClass: T, workflowUUID?: string, queue?: WorkflowQueue): AsyncHandlerWfFuncs<T>;

retrieveWorkflow<R>(workflowUUID: string): WorkflowHandle<R>;
send<T>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void>;
getEvent<T>(workflowUUID: string, key: string, timeoutSeconds?: number): Promise<T | null>;
Expand Down Expand Up @@ -143,10 +145,12 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
* Generate a proxy object for the provided class that wraps direct calls (i.e. OpClass.someMethod(param))
* to use WorkflowContext.Transaction(OpClass.someMethod, param);
*/
mainInvoke<T extends object>(object: T, workflowUUID: string | undefined, asyncWf: boolean, configuredInstance: ConfiguredInstance | null): InvokeFuncs<T> {
mainInvoke<T extends object>(object: T, workflowUUID: string | undefined, asyncWf: boolean, configuredInstance: ConfiguredInstance | null, queue?: WorkflowQueue)
: InvokeFuncs<T>
{
const ops = getRegisteredOperations(object);
const proxy: Record<string, unknown> = {};
const params = { workflowUUID: workflowUUID, parentCtx: this, configuredInstance };
const params = { workflowUUID: workflowUUID, parentCtx: this, configuredInstance, queueName: queue?.name };

for (const op of ops) {
if (asyncWf) {
Expand Down Expand Up @@ -177,31 +181,31 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex

invoke<T extends object>(object: T | ConfiguredInstance, workflowUUID?: string): InvokeFuncs<T> | InvokeFuncsInst<T> {
if (typeof object === 'function') {
return this.mainInvoke(object, workflowUUID, true, null);
return this.mainInvoke(object, workflowUUID, true, null, undefined);
}
else {
const targetInst = object as ConfiguredInstance;
return this.mainInvoke(targetInst, workflowUUID, true, targetInst) as unknown as InvokeFuncsInst<T>;
return this.mainInvoke(targetInst, workflowUUID, true, targetInst, undefined) as unknown as InvokeFuncsInst<T>;
}
}

startWorkflow<T extends object>(object: T | ConfiguredInstance, workflowUUID?: string): AsyncHandlerWfFuncs<T> | AsyncHandlerWfFuncInst<T> {
startWorkflow<T extends object>(object: T | ConfiguredInstance, workflowUUID?: string, queue?: WorkflowQueue): AsyncHandlerWfFuncs<T> | AsyncHandlerWfFuncInst<T> {
if (typeof object === 'function') {
return this.mainInvoke(object, workflowUUID, true, null);
return this.mainInvoke(object, workflowUUID, true, null, queue);
}
else {
const targetInst = object as ConfiguredInstance;
return this.mainInvoke(targetInst, workflowUUID, true, targetInst) as unknown as AsyncHandlerWfFuncInst<T>;
return this.mainInvoke(targetInst, workflowUUID, true, targetInst, queue) as unknown as AsyncHandlerWfFuncInst<T>;
}
}

invokeWorkflow<T extends object>(object: T | ConfiguredInstance, workflowUUID?: string): SyncHandlerWfFuncs<T> | SyncHandlerWfFuncsInst<T> {
if (typeof object === 'function') {
return this.mainInvoke(object, workflowUUID, false, null) as unknown as SyncHandlerWfFuncs<T>;
return this.mainInvoke(object, workflowUUID, false, null, undefined) as unknown as SyncHandlerWfFuncs<T>;
}
else {
const targetInst = object as ConfiguredInstance;
return this.mainInvoke(targetInst, workflowUUID, false, targetInst) as unknown as SyncHandlerWfFuncsInst<T>;
return this.mainInvoke(targetInst, workflowUUID, false, targetInst, undefined) as unknown as SyncHandlerWfFuncsInst<T>;
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ export {
DBOSConfig,
} from "./dbos-executor"

export {
WorkflowQueue,
} from "./wfqueue"
4 changes: 0 additions & 4 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,4 @@ class DetachableLoop {
this.interruptResolve(); // Trigger the interruption
}
}

private sleepms(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
Loading