Skip to content

Commit

Permalink
Deactivate (#647)
Browse files Browse the repository at this point in the history
An admin api to stop scheduler and kafka consumers from processing
events.

Processing will resume only when u restart the app
  • Loading branch information
manojdbos authored Oct 23, 2024
1 parent 00de4f6 commit 9f293e5
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 12 deletions.
14 changes: 14 additions & 0 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ import { DBOSEventReceiver, DBOSExecutorContext} from ".";
import { get } from "lodash";
import { wfQueueRunner, WorkflowQueue } from "./wfqueue";
import { debugTriggerPoint, DEBUG_TRIGGER_WORKFLOW_ENQUEUE } from "./debugpoint";
import { DBOSScheduler } from './scheduler/scheduler';
import { DBOSEventReceiverState, DBOSEventReceiverQuery, DBNotificationCallback, DBNotificationListener } from "./eventreceiver";


// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DBOSNull { }
export const dbosNull: DBOSNull = {};
Expand Down Expand Up @@ -164,6 +166,8 @@ export class DBOSExecutor implements DBOSExecutorContext {

eventReceivers: DBOSEventReceiver[] = [];

scheduler: DBOSScheduler | null = null;

/* WORKFLOW EXECUTOR LIFE CYCLE MANAGEMENT */
constructor(readonly config: DBOSConfig, systemDatabase?: SystemDatabase) {
this.debugMode = config.debugMode ?? false;
Expand Down Expand Up @@ -226,6 +230,7 @@ export class DBOSExecutor implements DBOSExecutorContext {
this.initialized = false;
}


configureDbClient() {
const userDbClient = this.config.userDbclient;
const userDBConfig = this.config.poolConfig;
Expand Down Expand Up @@ -961,6 +966,15 @@ export class DBOSExecutor implements DBOSExecutorContext {
return handlerArray;
}

async deactivateEventReceivers() {
this.logger.info("Deactivating event receivers");
for (const evtRcvr of this.eventReceivers || []) {
await evtRcvr.destroy();
}
await this.scheduler?.destroyScheduler();
wfQueueRunner.stop();
}

async executeWorkflowUUID(workflowUUID: string, startNewWorkflow: boolean = false): Promise<WorkflowHandle<unknown>> {
const wfStatus = await this.systemDatabase.getWorkflowStatus(workflowUUID);
const inputs = await this.systemDatabase.getWorkflowInputs(workflowUUID);
Expand Down
1 change: 1 addition & 0 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export class DBOSRuntime {
*/
async initAndStart() {
try {

this.dbosExec = new DBOSExecutor(this.dbosConfig);
DBOS.globalLogger = this.dbosExec.logger;
this.dbosExec.logger.debug(`Loading classes from entrypoints ${JSON.stringify(this.runtimeConfig.entrypoints)}`);
Expand Down
18 changes: 18 additions & 0 deletions src/httpServer/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const WorkflowUUIDHeader = "dbos-idempotency-key";
export const WorkflowRecoveryUrl = "/dbos-workflow-recovery"
export const HealthUrl = "/dbos-healthz"
export const PerfUrl = "/dbos-perf"
export const DeactivateUrl = "/deactivate"

export class DBOSHttpServer {
readonly app: Koa;
Expand All @@ -55,6 +56,7 @@ export class DBOSHttpServer {
DBOSHttpServer.registerHealthEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerRecoveryEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerPerfEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerDeactivateEndpoint(this.dbosExec, this.adminRouter);
this.adminApp.use(this.adminRouter.routes()).use(this.adminRouter.allowedMethods());
DBOSHttpServer.registerDecoratedEndpoints(this.dbosExec, this.applicationRouter, this.app);
this.app.use(this.applicationRouter.routes()).use(this.applicationRouter.allowedMethods());
Expand Down Expand Up @@ -169,6 +171,22 @@ async checkPortAvailability(port: number, host: string): Promise<void> {
dbosExec.logger.debug(`DBOS Server Registered Perf GET ${HealthUrl}`);
}

/**
* Register Deactiviate endpoint.
* Deactivate consumers so that they don'nt start new workflows.
*
*/
static registerDeactivateEndpoint(dbosExec: DBOSExecutor, router: Router) {
const deactivateHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
await dbosExec.deactivateEventReceivers();
dbosExec.logger.info("Deactivating Event Recievers");
koaCtxt.body = "Deactivated";
await koaNext();
};
router.get(DeactivateUrl, deactivateHandler);
dbosExec.logger.info(`DBOS Server deactivate GET ${DeactivateUrl}`);
}

/**
* Register decorated functions as HTTP endpoints.
*/
Expand Down
4 changes: 3 additions & 1 deletion src/scheduler/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ export function Scheduled(schedulerConfig: SchedulerConfig) {
///////////////////////////

export class DBOSScheduler{
constructor(readonly dbosExec: DBOSExecutor) {}
constructor(readonly dbosExec: DBOSExecutor) {
dbosExec.scheduler = this;
}

schedLoops: DetachableLoop[] = [];
schedTasks: Promise<void> [] = [];
Expand Down
24 changes: 13 additions & 11 deletions src/testing/testing_runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export interface TestingRuntime {
dropUserSchema(): Promise<void>; // Only valid if using TypeORM. Drop all tables created by createUserSchema().

destroy(): Promise<void>; // Release resources after tests.
deactivateEventReceivers(): Promise<void>; // Deactivate event receivers.
}

/**
Expand Down Expand Up @@ -131,25 +132,26 @@ export class TestingRuntimeImpl implements TestingRuntime {
async destroy() {
// Only release once.
if (this.#isInitialized) {
try {
wfQueueRunner.stop();
await this.#wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}
await this.#scheduler?.destroyScheduler();
await this.destroyEventReceivers();
await this.deactivateEventReceivers();
await this.#server?.dbosExec.destroy();
this.#isInitialized = false;
}
}

async destroyEventReceivers() {
async deactivateEventReceivers() {
for (const evtRcvr of this.#server?.dbosExec?.eventReceivers || []) {
await evtRcvr.destroy();
}
await this.#scheduler?.destroyScheduler();
try {
wfQueueRunner.stop();
await this.#wfQueueRunner;
}
catch (err) {
const e = err as Error;
this.#server?.dbosExec?.logger.warn(`Error destroying workflow queue runner: ${e.message}`);
}

}

/**
Expand Down
5 changes: 5 additions & 0 deletions tests/scheduler/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ describe("scheduled-wf-tests-simple", () => {
expect(DBOSSchedTestClass.nTooEarly).toBe(0);
expect(DBOSSchedTestClass.nTooLate).toBe(0);

await testRuntime.deactivateEventReceivers();

await sleepms(1000);
expect(DBOSSchedTestClass.nCalls).toBeLessThanOrEqual(3);

});
});

Expand Down

0 comments on commit 9f293e5

Please sign in to comment.