Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suspend-resume support #376

Closed
wants to merge 10 commits into from
68 changes: 68 additions & 0 deletions src/durableorchestrationclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,74 @@ export class DurableOrchestrationClient {
}
}

/**
* Suspends a running orchestration instance.
* @param instanceId The ID of the orchestration instance to suspend.
* @param reason The reason for suspending the orchestration instance.
* @returns A promise that resolves when the suspend message is enqueued.
*/
public async suspend(instanceId: string, reason: string): Promise<void> {
const idPlaceholder = this.clientData.managementUrls.id;
let requestUrl: string;
if (this.clientData.rpcBaseUrl) {
// Fast local RPC path
requestUrl = new URL(
`instances/${instanceId}/suspend?reason=${reason}`,
this.clientData.rpcBaseUrl
).href;
} else {
// Legacy app frontend path
requestUrl = this.clientData.managementUrls.suspendPostUri
.replace(idPlaceholder, instanceId)
.replace(this.reasonPlaceholder, reason);
}

const response = await this.axiosInstance.post(requestUrl);
switch (response.status) {
case 202: // suspend accepted
case 410: // instance completed or failed
return;
case 404:
return Promise.reject(new Error(`No instance with ID '${instanceId}' found.`));
default:
return Promise.reject(this.createGenericError(response));
}
}

/**
* Resumes a running orchestration instance.
* @param instanceId The ID of the orchestration instance to resume.
* @param reason The reason for resuming the orchestration instance.
* @returns A promise that resolves when the resume message is enqueued.
*/
public async resume(instanceId: string, reason: string): Promise<void> {
const idPlaceholder = this.clientData.managementUrls.id;
let requestUrl: string;
if (this.clientData.rpcBaseUrl) {
// Fast local RPC path
requestUrl = new URL(
`instances/${instanceId}/resume?reason=${reason}`,
this.clientData.rpcBaseUrl
).href;
} else {
// Legacy app frontend path
requestUrl = this.clientData.managementUrls.resumePostUri
.replace(idPlaceholder, instanceId)
.replace(this.reasonPlaceholder, reason);
}

const response = await this.axiosInstance.post(requestUrl);
switch (response.status) {
case 202: // resume accepted
case 410: // instance completed or failed
return;
case 404:
return Promise.reject(new Error(`No instance with ID '${instanceId}' found.`));
default:
return Promise.reject(this.createGenericError(response));
}
}

/**
* Creates an HTTP response which either contains a payload of management
* URLs for a non-completed instance or contains the payload containing
Expand Down
2 changes: 2 additions & 0 deletions src/history/historyeventtype.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ export enum HistoryEventType {
ContinueAsNew = 16,
GenericEvent = 17,
HistoryState = 18,
ExecutionSuspended = 19,
ExecutionResumed = 20,
}
6 changes: 5 additions & 1 deletion src/httpmanagementpayload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export class HttpManagementPayload {
/** The HTTP POST instance rewind endpoint. */
public readonly rewindPostUri: string,
/** The HTTP DELETE purge endpoint. */
public readonly purgeHistoryDeleteUri: string
public readonly purgeHistoryDeleteUri: string,
/** The HTTP POST instance suspension endpoint. */
public readonly suspendPostUri: string,
/** The HTTP POST instance resumption endpoint. */
public readonly resumePostUri: string
) {}
}
5 changes: 5 additions & 0 deletions src/orchestrationruntimestatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ export enum OrchestrationRuntimeStatus {
* running.
*/
Pending = "Pending",

/**
* The orchestration instance was suspended.
*/
Suspended = "Suspended",
}
2 changes: 1 addition & 1 deletion test/testobjects/TestOrchestrations.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as df from "../../src";

export class TestOrchestrations {
public static NotGenerator: any = df.orchestrator(function* (context: any) {
public static NotGenerator: any = df.orchestrator(function* () {
return "Hello";
});

Expand Down
2 changes: 2 additions & 0 deletions test/testobjects/testconstants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export class TestConstants {
public static readonly terminatePostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}instances/${TestConstants.idPlaceholder}/terminate?reason=${TestConstants.reasonPlaceholder}&${TestConstants.uriSuffix}`; // tslint:disable-line max-line-length
public static readonly rewindPostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}instances/${TestConstants.idPlaceholder}/rewind?reason=${TestConstants.reasonPlaceholder}&${TestConstants.uriSuffix}`; // tslint:disable-line max-line-length
public static readonly purgeDeleteUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}instances/${TestConstants.idPlaceholder}?${TestConstants.uriSuffix}`; // tslint:disable-line max-line-length
public static readonly suspendPostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}instances/${TestConstants.idPlaceholder}/suspend?reason=${TestConstants.reasonPlaceholder}&${TestConstants.uriSuffix}`; // tslint:disable-line max-line-length
public static readonly resumePostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}instances/${TestConstants.idPlaceholder}/resume?reason=${TestConstants.reasonPlaceholder}&${TestConstants.uriSuffix}`; // tslint:disable-line max-line-length

public static readonly createPostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}orchestrators/${TestConstants.functionPlaceholder}${TestConstants.idPlaceholder}?${TestConstants.testCode}`; // tslint:disable-line max-line-length
public static readonly waitOnPostUriTemplate: string = `${TestConstants.hostPlaceholder}${TestConstants.webhookPath}orchestrators/${TestConstants.functionPlaceholder}${TestConstants.idPlaceholder}?timeout=${TestConstants.timeoutPlaceholder}&pollingInterval=${TestConstants.intervalPlaceholder}&${TestConstants.testCode}`; // tslint:disable-line max-line-length
Expand Down
10 changes: 10 additions & 0 deletions test/testobjects/testutils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ export class TestUtils {
.replace(TestConstants.taskHubPlaceholder, taskHub)
.replace(TestConstants.connectionPlaceholder, connection),
TestConstants.purgeDeleteUriTemplate
.replace(TestConstants.hostPlaceholder, host)
.replace(TestConstants.idPlaceholder, id)
.replace(TestConstants.taskHubPlaceholder, taskHub)
.replace(TestConstants.connectionPlaceholder, connection),
TestConstants.suspendPostUriTemplate
.replace(TestConstants.hostPlaceholder, host)
.replace(TestConstants.idPlaceholder, id)
.replace(TestConstants.taskHubPlaceholder, taskHub)
.replace(TestConstants.connectionPlaceholder, connection),
TestConstants.resumePostUriTemplate
.replace(TestConstants.hostPlaceholder, host)
.replace(TestConstants.idPlaceholder, id)
.replace(TestConstants.taskHubPlaceholder, taskHub)
Expand Down
58 changes: 52 additions & 6 deletions test/unit/durableclient-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const durableClientBindingInputJson = JSON.stringify({
terminatePostUri: `${externalBaseUrl}/instances/INSTANCEID/?taskHub=${testTaskHubName}&connection=${testConnectionName}`,
rewindPostUri: `${externalBaseUrl}/instances/INSTANCEID/?taskHub=${testTaskHubName}&connection=${testConnectionName}`,
purgeHistoryDeleteUri: `${externalBaseUrl}/instances/INSTANCEID/?taskHub=${testTaskHubName}&connection=${testConnectionName}`,
suspendPostUri: `${externalBaseUrl}/instances/INSTANCEID/?taskHub=${testTaskHubName}&connection=${testConnectionName}`,
resumePostUri: `${externalBaseUrl}/instances/INSTANCEID/?taskHub=${testTaskHubName}&connection=${testConnectionName}`,
},
baseUrl: externalBaseUrl,
rpcBaseUrl: testRpcBaseUrl,
Expand Down Expand Up @@ -91,6 +93,8 @@ describe("Durable client RPC endpoint", () => {
expect(payload.sendEventPostUri).to.startWith(externalBaseUrl);
expect(payload.statusQueryGetUri).to.startWith(externalBaseUrl);
expect(payload.terminatePostUri).to.startWith(externalBaseUrl);
expect(payload.suspendPostUri).to.startWith(externalBaseUrl);
expect(payload.resumePostUri).to.startWith(externalBaseUrl);
});
});

Expand Down Expand Up @@ -218,11 +222,11 @@ describe("Durable client RPC endpoint", () => {
const input = JSON.parse(durableClientBindingInputJson) as OrchestrationClientInputData;
const client = new DurableOrchestrationClient(input);

// The getStatusBy() method should do a GET to http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed
// The getStatusBy() method should do a GET to http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed,Suspended
const expectedUrl = new URL(`${testRpcOrigin}/durabletask/instances/`);
const createdTimeFrom = "2020-01-01T00:00:00.000Z";
const createdTimeTo = "2020-01-01T23:59:59.000Z";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed,Suspended";

const scope = nock(expectedUrl.origin)
.get(expectedUrl.pathname)
Expand Down Expand Up @@ -250,11 +254,11 @@ describe("Durable client RPC endpoint", () => {
const input = JSON.parse(durableClientBindingInputJson) as OrchestrationClientInputData;
const client = new DurableOrchestrationClient(input);

// The getStatusBy() method should do a GET to http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed
// The getStatusBy() method should do a GET to http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed,Suspended
const expectedUrl = new URL(`${testRpcOrigin}/durabletask/instances/`);
const createdTimeFrom = "2020-01-01T00:00:00.000Z";
const createdTimeTo = "2020-01-01T23:59:59.000Z";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed,Suspended";

// create dummy orchestration status for response
const dummyDate = new Date();
Expand Down Expand Up @@ -349,11 +353,11 @@ describe("Durable client RPC endpoint", () => {
const client = new DurableOrchestrationClient(input);

// The purgeInstanceHistoryBy() method should do a DELETE to
// http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed
// http://127.0.0.1:17071/durabletask/instances/?createdTimeFrom=2020-01-01T00:00:00Z&createdTimeTo=2020-01-01T23:59:59Z&runtimeStatus=Pending,Running,Completed,Terminated,Failed,Suspended
const expectedUrl = new URL(`${testRpcOrigin}/durabletask/instances/`);
const createdTimeFrom = "2020-01-01T00:00:00.000Z";
const createdTimeTo = "2020-01-01T23:59:59.000Z";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed";
const runtimeStatus = "Pending,Running,Completed,Terminated,Failed,Suspended";

const scope = nock(expectedUrl.origin)
.delete(expectedUrl.pathname)
Expand Down Expand Up @@ -512,4 +516,46 @@ describe("Durable client RPC endpoint", () => {
expect(result.entityState).to.deep.equal(expectedEntityState);
});
});

describe("suspend()", () => {
it("uses the RPC endpoint", async () => {
const input = JSON.parse(durableClientBindingInputJson) as OrchestrationClientInputData;
const client = new DurableOrchestrationClient(input);

// The suspend() method should do a POST to http://127.0.0.1:17071/durabletask/instances/abc123/suspend?reason=because
const instanceId = "abc123";
const reason = "because";
const expectedUrl = new URL(
`${testRpcOrigin}/durabletask/instances/${instanceId}/suspend?reason=${reason}`
);

const scope = nock(expectedUrl.origin)
.post(expectedUrl.pathname + expectedUrl.search)
.reply(202);

await client.suspend(instanceId, reason);
expect(scope.isDone()).to.be.equal(true);
});
});

describe("resume()", () => {
it("uses the RPC endpoint", async () => {
const input = JSON.parse(durableClientBindingInputJson) as OrchestrationClientInputData;
const client = new DurableOrchestrationClient(input);

// The resume() method should do a POST to http://127.0.0.1:17071/durabletask/instances/abc123/resume?reason=because
const instanceId = "abc123";
const reason = "because";
const expectedUrl = new URL(
`${testRpcOrigin}/durabletask/instances/${instanceId}/resume?reason=${reason}`
);

const scope = nock(expectedUrl.origin)
.post(expectedUrl.pathname + expectedUrl.search)
.reply(202);

await client.resume(instanceId, reason);
expect(scope.isDone()).to.be.equal(true);
});
});
});
Loading