Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infrastructure for DB trigger event receiver #644

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions migrations/20241009150000_event_dispatch_kv.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this separate table, or can we simply use the queue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you can't use the queue because it doesn't provide catch-up state.
Also, I want rid of the queue table, I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more thorough explanation.

All the event receivers I've looked at need state about how far through the source they've gotten. cron is keeping the time, kafka has topic offsets*, db triggers may need the table timestamp also (depending on the application). This table is for recording that. It is not always possible or efficient to consult the workflow status table to rebuild this. The queue table is getting gc'd so I'd not want to rely on that either.

*I predict that we will eventually learn the limitations of letting the kafka side keep these, and want to take control over it... I've seen the ending of that movie before.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
exports.up = function(knex) {
return knex.schema.withSchema('dbos')
.createTable('event_dispatch_kv', function(table) {
table.text('service_name').notNullable();
table.text('workflow_fn_name').notNullable();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to make sure the workflow name contains the class name

table.text('key').notNullable();
table.text('value');
table.decimal('update_seq', 38, 0);
table.decimal('update_time', 38, 15);
table.primary(['service_name','workflow_fn_name','key']);
})
};

exports.down = function(knex) {
return knex.schema.withSchema('dbos')
.dropTableIfExists('event_dispatch_kv');
};
13 changes: 10 additions & 3 deletions schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,16 @@ export interface workflow_inputs {
inputs: string;
}

export interface scheduler_state {
export interface event_dispatch_kv {
// Key fields
service_name: string;
workflow_fn_name: string;
last_run_time: number; // Time that has certainly been kicked off; others may have but OAOO will cover that
key: string;

// Payload fields
value?: string;
update_time?: number; // Timestamp of record (for upsert)
update_seq?: bigint; // Sequence number of record (for upsert)
}

export interface workflow_queue {
Expand All @@ -50,4 +57,4 @@ export interface workflow_queue {
created_at_epoch_ms: number; // This time is provided by the database
started_at_epoch_ms?: number; // This time is provided by the client
completed_at_epoch_ms?: number; // This time is provided by the client
}
}
44 changes: 44 additions & 0 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import { DBOSEventReceiver, DBOSExecutorContext} from ".";
import { get } from "lodash";
import { wfQueueRunner, WorkflowQueue } from "./wfqueue";
import { debugTriggerPoint, DEBUG_TRIGGER_WORKFLOW_ENQUEUE } from "./debugpoint";
import { DBOSEventReceiverState, DBOSEventReceiverQuery, DBNotificationCallback, DBNotificationListener } from "./eventreceiver";

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DBOSNull { }
Expand Down Expand Up @@ -896,6 +897,38 @@ export class DBOSExecutor implements DBOSExecutorContext {
return new RetrievedHandle(this.systemDatabase, workflowUUID);
}

async queryUserDB(sql: string, params?: unknown[]) {
if (params !== undefined) {
return await this.userDatabase.query(sql, ...params);
}
else {
return await this.userDatabase.query(sql);
}
}

async userDBListen(channels: string[], callback: DBNotificationCallback): Promise<DBNotificationListener> {
const notificationsClient = await this.procedurePool.connect();
for (const nname of channels) {
await notificationsClient.query(`LISTEN ${nname};`);
}

notificationsClient.on("notification", callback);

return {
close: async () => {
for (const nname of channels) {
try {
await notificationsClient.query(`UNLISTEN ${nname};`);
}
catch(e) {
this.logger.warn(e);
}
notificationsClient.release();
}
}
}
}

/* INTERNAL HELPERS */
#generateUUID(): string {
return uuidv4();
Expand Down Expand Up @@ -1006,6 +1039,17 @@ export class DBOSExecutor implements DBOSExecutorContext {
return this.workflow(temp_workflow, { workflowUUID: workflowStartUUID, parentCtx: parentCtx ?? undefined, configuredInstance: clsinst, recovery: true, tempWfType, tempWfClass, tempWfName}, ...inputs);
}

async getEventDispatchState(svc: string, wfn: string, key: string): Promise<DBOSEventReceiverState | undefined> {
return await this.systemDatabase.getEventDispatchState(svc, wfn, key);
}
async queryEventDispatchState(query: DBOSEventReceiverQuery): Promise<DBOSEventReceiverState[]> {
return await this.systemDatabase.queryEventDispatchState(query);
}
async upsertEventDispatchState(state: DBOSEventReceiverState): Promise<DBOSEventReceiverState> {
return await this.systemDatabase.upsertEventDispatchState(state);
}


// NOTE: this creates a new span, it does not inherit the span from the original workflow
#getRecoveryContext(workflowUUID: string, status: WorkflowStatus): DBOSContextImpl {
const span = this.tracer.startSpan(status.workflowName, {
Expand Down
1 change: 1 addition & 0 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class DBOSRuntime {
this.scheduler = new DBOSScheduler(this.dbosExec);
this.scheduler.initScheduler();
this.scheduler.logRegisteredSchedulerEndpoints();

wfQueueRunner.logRegisteredEndpoints(this.dbosExec);
this.wfQueueRunner = wfQueueRunner.dispatchLoop(this.dbosExec);

Expand Down
4 changes: 3 additions & 1 deletion src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ export interface MethodRegistrationBase {
procConfig?: TransactionConfig;
isInstance: boolean;

eventReceiverInfo: Map<DBOSEventReceiver, unknown>;
eventReceiverInfo: Map<DBOSEventReceiver, unknown>;

// eslint-disable-next-line @typescript-eslint/ban-types
registeredFunction: Function | undefined;
// eslint-disable-next-line @typescript-eslint/ban-types
origFunction: Function;

invoke(pthis: unknown, args: unknown[]): unknown;
}
Expand Down
55 changes: 54 additions & 1 deletion src/eventreceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ import { WorkflowFunction, WorkflowHandle, WorkflowParams } from './workflow';
import { TransactionFunction } from './transaction';
import { MethodRegistrationBase } from './decorators';
import { StepFunction } from './step';
import { Notification } from "pg";

export type DBNotification = Notification;
export type DBNotificationCallback = (n: DBNotification) => void;
export interface DBNotificationListener {
close(): Promise<void>;
}

export interface DBOSEventReceiverRegistration {
methodConfig: unknown,
classConfig: unknown,
methodReg: MethodRegistrationBase
}

/*
* Info provided to an event receiver at initialization,
Expand All @@ -26,7 +39,7 @@ export interface DBOSExecutorContext
* classConfig: the class info the receiver stored
* methodReg: the method registration (w/ workflow, transaction, function, and other info)
*/
getRegistrationsFor(eri: DBOSEventReceiver) : {methodConfig: unknown, classConfig: unknown, methodReg: MethodRegistrationBase}[];
getRegistrationsFor(eri: DBOSEventReceiver) : DBOSEventReceiverRegistration[];

transaction<T extends unknown[], R>(txn: TransactionFunction<T, R>, params: WorkflowParams, ...args: T): Promise<R>;
workflow<T extends unknown[], R>(wf: WorkflowFunction<T, R>, params: WorkflowParams, ...args: T): Promise<WorkflowHandle<R>>;
Expand All @@ -35,6 +48,25 @@ export interface DBOSExecutorContext
send<T>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void>;
getEvent<T>(workflowUUID: string, key: string, timeoutSeconds: number): Promise<T | null>;
retrieveWorkflow<R>(workflowUUID: string): WorkflowHandle<R>;

// Event receiver state queries / updates
/*
* An event dispatcher may keep state in the system database
* The 'service' should be unique to the event receiver keeping state, to separate from others
* The 'workflowFnName' workflow function name should be the fully qualified / unique function name dispatched
* The 'key' field allows multiple records per service / workflow function
* The service+workflowFnName+key uniquely identifies the record, which is associated with:
* 'value' - a value set by the event receiver service; this string may be a JSON to keep complex details
* A version, either as a sequence number (long integer), or as a time (high precision floating point).
* If versions are in use, any upsert is discarded if the version field is less than what is already stored.
* The upsert returns the current record, which is useful if it is more recent.
*/
getEventDispatchState(service: string, workflowFnName: string, key: string): Promise<DBOSEventReceiverState | undefined>;
upsertEventDispatchState(state: DBOSEventReceiverState): Promise<DBOSEventReceiverState>;

queryUserDB(sql: string, params?: unknown[]): Promise<unknown[]>;

userDBListen(channels: string[], callback: DBNotificationCallback): Promise<DBNotificationListener>;
}

/*
Expand All @@ -51,4 +83,25 @@ export interface DBOSEventReceiver
destroy() : Promise<void>;
initialize(executor: DBOSExecutorContext) : Promise<void>;
logRegisteredEndpoints() : void;
}

export interface DBOSEventReceiverState
{
service: string;
workflowFnName: string;
key: string;
value?: string;
updateTime?: number;
updateSeq?: bigint;
}

export interface DBOSEventReceiverQuery
{
service?: string;
workflowFnName?: string;
key?: string;
startTime?: number;
endTime?: number;
startSeq?: bigint;
endSeq?: bigint;
}
16 changes: 10 additions & 6 deletions src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DBOSWorkflowConflictUUIDError } from "../error";
import { NativeValue } from "foundationdb/dist/lib/native";
import { DBOSJSON, sleepms } from "../utils";
import { WorkflowQueue } from "../wfqueue";
import { DBOSEventReceiverState, DBOSEventReceiverQuery } from "../eventreceiver";

interface OperationOutput<R> {
output: R;
Expand Down Expand Up @@ -400,14 +401,17 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
await sleepms(durationMS); // TODO: Implement
}

/* SCHEDULER */
getLastScheduledTime(_wfn: string): Promise<number | null> {
return Promise.resolve(null);
// Event dispatcher queries / updates
async getEventDispatchState(_svc: string, _wfn: string, _key: string): Promise<DBOSEventReceiverState | undefined> {
return Promise.resolve(undefined);
}
setLastScheduledTime(_wfn: string, _invtime: number): Promise<number | null> {
return Promise.resolve(null);
async queryEventDispatchState(_query: DBOSEventReceiverQuery): Promise<DBOSEventReceiverState[]> {
return Promise.resolve([]);
}

async upsertEventDispatchState(state: DBOSEventReceiverState): Promise<DBOSEventReceiverState> {
return Promise.resolve(state);
}

getWorkflows(_input: GetWorkflowsInput): Promise<GetWorkflowsOutput> {
throw new Error("Method not implemented.");
}
Expand Down
5 changes: 5 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ export {

export {
DBOSEventReceiver,
DBOSEventReceiverRegistration,
DBOSExecutorContext,
DBNotification,
DBNotificationListener,
DBOSEventReceiverQuery,
DBOSEventReceiverState,
} from "./eventreceiver";

export {
Expand Down
19 changes: 15 additions & 4 deletions src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WorkflowContext } from "..";
import { DBOSEventReceiverState, WorkflowContext } from "..";
import { DBOSExecutor } from "../dbos-executor";
import { MethodRegistrationBase, registerAndWrapFunction } from "../decorators";
import { TimeMatcher } from "./crontab";
Expand Down Expand Up @@ -97,6 +97,8 @@ export class DBOSScheduler{
}
}

const SCHEDULER_EVENT_SERVICE_NAME = 'dbos.scheduler';

class DetachableLoop {
private isRunning: boolean = false;
private interruptResolve?: () => void;
Expand All @@ -117,9 +119,10 @@ class DetachableLoop {
// See if the exec time is available in durable storage...
if (this.schedMode === SchedulerMode.ExactlyOncePerInterval)
{
const lasttm = await this.dbosExec.systemDatabase.getLastScheduledTime(this.scheduledMethodName);
const lastState = await this.dbosExec.systemDatabase.getEventDispatchState(SCHEDULER_EVENT_SERVICE_NAME, this.scheduledMethodName, 'lastState');
const lasttm = lastState?.value;
if (lasttm) {
this.lastExec = new Date(lasttm);
this.lastExec = new Date(parseFloat(lasttm));
}
}

Expand Down Expand Up @@ -176,7 +179,15 @@ class DetachableLoop {
}

// Record the time of the wf kicked off
const dbTime = await this.dbosExec.systemDatabase.setLastScheduledTime(this.scheduledMethodName, nextExecTime.getTime());
const ers: DBOSEventReceiverState = {
service: SCHEDULER_EVENT_SERVICE_NAME,
workflowFnName: this.scheduledMethodName,
key: 'lastState',
value: `${nextExecTime.getTime()}`,
updateTime: nextExecTime.getTime(),
}
const updRec = await this.dbosExec.systemDatabase.upsertEventDispatchState(ers);
const dbTime = parseFloat(updRec.value!);
if (dbTime && dbTime > nextExecTime.getTime()) nextExecTime.setTime(dbTime);
this.lastExec = nextExecTime;
}
Expand Down
Loading