Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deactivate #647

Merged
merged 15 commits into from
Oct 23, 2024
14 changes: 14 additions & 0 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ import { DBOSEventReceiver, DBOSExecutorContext} from ".";
import { get } from "lodash";
import { wfQueueRunner, WorkflowQueue } from "./wfqueue";
import { debugTriggerPoint, DEBUG_TRIGGER_WORKFLOW_ENQUEUE } from "./debugpoint";
import { DBOSScheduler } from './scheduler/scheduler';
import { DBOSEventReceiverState, DBOSEventReceiverQuery, DBNotificationCallback, DBNotificationListener } from "./eventreceiver";


// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DBOSNull { }
export const dbosNull: DBOSNull = {};
Expand Down Expand Up @@ -164,6 +166,8 @@ export class DBOSExecutor implements DBOSExecutorContext {

eventReceivers: DBOSEventReceiver[] = [];

scheduler: DBOSScheduler | null = null;

/* WORKFLOW EXECUTOR LIFE CYCLE MANAGEMENT */
constructor(readonly config: DBOSConfig, systemDatabase?: SystemDatabase) {
this.debugMode = config.debugMode ?? false;
Expand Down Expand Up @@ -226,6 +230,7 @@ export class DBOSExecutor implements DBOSExecutorContext {
this.initialized = false;
}


configureDbClient() {
const userDbClient = this.config.userDbclient;
const userDBConfig = this.config.poolConfig;
Expand Down Expand Up @@ -961,6 +966,15 @@ export class DBOSExecutor implements DBOSExecutorContext {
return handlerArray;
}

async deactivateEventReceivers() {
this.logger.info("Deactivating event receivers");
for (const evtRcvr of this.eventReceivers || []) {
await evtRcvr.destroy();
}
await this.scheduler?.destroyScheduler();
wfQueueRunner.stop();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also shut down the queue polling loop, or does that need to be done elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you are referring to the WF Queue runner. Will add the code to shut that down as well


async executeWorkflowUUID(workflowUUID: string, startNewWorkflow: boolean = false): Promise<WorkflowHandle<unknown>> {
const wfStatus = await this.systemDatabase.getWorkflowStatus(workflowUUID);
const inputs = await this.systemDatabase.getWorkflowInputs(workflowUUID);
Expand Down
1 change: 1 addition & 0 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class DBOSRuntime {
*/
async initAndStart() {
try {

this.dbosExec = new DBOSExecutor(this.dbosConfig);
DBOS.globalLogger = this.dbosExec.logger;
this.dbosExec.logger.debug(`Loading classes from entrypoints ${JSON.stringify(this.runtimeConfig.entrypoints)}`);
Expand Down
18 changes: 18 additions & 0 deletions src/httpServer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const WorkflowUUIDHeader = "dbos-idempotency-key";
export const WorkflowRecoveryUrl = "/dbos-workflow-recovery"
export const HealthUrl = "/dbos-healthz"
export const PerfUrl = "/dbos-perf"
export const DeactivateUrl = "/deactivate"

export class DBOSHttpServer {
readonly app: Koa;
Expand All @@ -55,6 +56,7 @@ export class DBOSHttpServer {
DBOSHttpServer.registerHealthEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerRecoveryEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerPerfEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerDeactivateEndpoint(this.dbosExec, this.adminRouter);
this.adminApp.use(this.adminRouter.routes()).use(this.adminRouter.allowedMethods());
DBOSHttpServer.registerDecoratedEndpoints(this.dbosExec, this.applicationRouter, this.app);
this.app.use(this.applicationRouter.routes()).use(this.applicationRouter.allowedMethods());
Expand Down Expand Up @@ -169,6 +171,22 @@ async checkPortAvailability(port: number, host: string): Promise<void> {
dbosExec.logger.debug(`DBOS Server Registered Perf GET ${HealthUrl}`);
}

/**
* Register Deactiviate endpoint.
* Deactivate consumers so that they don'nt start new workflows.
*
*/
static registerDeactivateEndpoint(dbosExec: DBOSExecutor, router: Router) {
const deactivateHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
await dbosExec.deactivateEventReceivers();
dbosExec.logger.info("Deactivating Event Recievers");
koaCtxt.body = "Deactivated";
await koaNext();
};
router.get(DeactivateUrl, deactivateHandler);
dbosExec.logger.info(`DBOS Server deactivate GET ${DeactivateUrl}`);
}

/**
* Register decorated functions as HTTP endpoints.
*/
Expand Down
4 changes: 3 additions & 1 deletion src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ export function Scheduled(schedulerConfig: SchedulerConfig) {
///////////////////////////

export class DBOSScheduler{
constructor(readonly dbosExec: DBOSExecutor) {}
constructor(readonly dbosExec: DBOSExecutor) {
dbosExec.scheduler = this;
}

schedLoops: DetachableLoop[] = [];
schedTasks: Promise<void> [] = [];
Expand Down
24 changes: 13 additions & 11 deletions src/testing/testing_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export interface TestingRuntime {
dropUserSchema(): Promise<void>; // Only valid if using TypeORM. Drop all tables created by createUserSchema().

destroy(): Promise<void>; // Release resources after tests.
deactivateEventReceivers(): Promise<void>; // Deactivate event receivers.
}

/**
Expand Down Expand Up @@ -129,25 +130,26 @@ export class TestingRuntimeImpl implements TestingRuntime {
async destroy() {
// Only release once.
if (this.#isInitialized) {
try {
wfQueueRunner.stop();
await this.#wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}
await this.#scheduler?.destroyScheduler();
await this.destroyEventReceivers();
await this.deactivateEventReceivers();
await this.#server?.dbosExec.destroy();
this.#isInitialized = false;
}
}

async destroyEventReceivers() {
async deactivateEventReceivers() {
for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
}
await this.#scheduler?.destroyScheduler();
try {
wfQueueRunner.stop();
await this.#wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}

}

/**
Expand Down
5 changes: 5 additions & 0 deletions tests/scheduler/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ describe("scheduled-wf-tests-simple", () => {
expect(DBOSSchedTestClass.nTooEarly).toBe(0);
expect(DBOSSchedTestClass.nTooLate).toBe(0);

await testRuntime.deactivateEventReceivers();

await sleepms(1000);
expect(DBOSSchedTestClass.nCalls).toBeLessThanOrEqual(3);

});
});

Expand Down