From 6c8d45b9aa0160e8b01b1408b28de4daee960542 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Fri, 4 Aug 2023 18:38:01 -0700 Subject: [PATCH 1/6] add workflow Signed-off-by: Xavier Geerinck --- .github/workflows/test.yaml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 .github/workflows/test.yaml diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..932ee9c --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,23 @@ +name: 🚀 Test and Build + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: 📥 Checkout code + uses: actions/checkout@v2 + + - name: ⚙️ Install dependencies + run: npm install + + - name: ✅ Run tests + run: npm test From 1cb7af1401feb5e2dd6b77255f0f966b9c66a4c7 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Fri, 4 Aug 2023 18:40:07 -0700 Subject: [PATCH 2/6] split them up Signed-off-by: Xavier Geerinck --- .github/workflows/test.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 932ee9c..81ced0a 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -19,5 +19,8 @@ jobs: - name: ⚙️ Install dependencies run: npm install - - name: ✅ Run tests - run: npm test + - name: ✅ Run unit tests + run: npm test test/unit + + - name: ✅ Run e2e tests + run: npm test test/e2e From 0be46e6dfa328d02705d033366e5f7af88e213c9 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Fri, 4 Aug 2023 18:51:16 -0700 Subject: [PATCH 3/6] some fixes Signed-off-by: Xavier Geerinck --- scripts/test-e2e.sh | 14 +++++++++++ src/client.ts | 27 ++++++++++++++++---- src/orchestration/index.ts | 27 +++++++++++++++----- src/worker/activity-executor.ts | 40 +++++++++++++++--------------- src/worker/task-hub-grpc-worker.ts | 16 +++++++++--- test/e2e/orchestration.spec.ts | 9 +++---- 6 files changed, 93 insertions(+), 40 deletions(-) create mode 100755 scripts/test-e2e.sh diff --git a/scripts/test-e2e.sh b/scripts/test-e2e.sh new file mode 100755 index 0000000..667f372 --- /dev/null +++ b/scripts/test-e2e.sh @@ -0,0 +1,14 @@ +#!/bin/bash +echo "Starting Sidecar" +docker run \ + --name durabletask-sidecar -d --rm \ + -p 4001:4001 \ + --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' \ + cgillum/durabletask-sidecar:latest start \ + --backend Emulator + +echo "Running E2E tests" +npm run test test/e2e + +echo "Stopping Sidecar" +docker stop durabletask-sidecar \ No newline at end of file diff --git a/src/client.ts b/src/client.ts index 91ad885..40b1165 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,3 +1,5 @@ +import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; +import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb"; import * as pb from "./proto/orchestrator_service_pb"; import * as stubs from "./proto/orchestrator_service_grpc_pb"; import { TOrchestrator } from "./types/orchestrator.type"; @@ -18,7 +20,7 @@ export class TaskHubGrpcClient { } async scheduleNewOrchestration( - orchestrator: TOrchestrator, + orchestrator: TOrchestrator, input?: TInput, instanceId?: string, startAt?: Date, @@ -28,8 +30,15 @@ export class TaskHubGrpcClient { const req = new pb.CreateInstanceRequest(); req.setName(name); req.setInstanceid(instanceId ?? randomUUID()); - req.setInput(input ? JSON.stringify(input) : undefined); - req.setScheduledstarttimestamp(startAt?.getTime()); + + const i = new StringValue(); + i.setValue(JSON.stringify(input)); + + const ts = new Timestamp(); + ts.fromDate(new Date(startAt?.getTime() ?? 0)); + + req.setInput(i); + req.setScheduledstarttimestamp(ts); console.log(`Starting new ${name} instance with ID = ${req.getInstanceid()}`); @@ -101,7 +110,11 @@ export class TaskHubGrpcClient { const req = new pb.RaiseEventRequest(); req.setInstanceid(instanceId); req.setName(eventName); - req.setInput(JSON.stringify(data)); + + const i = new StringValue(); + i.setValue(JSON.stringify(data)); + + req.setInput(i); console.log(`Raising event '${eventName}' for instance '${instanceId}'`); @@ -112,7 +125,11 @@ export class TaskHubGrpcClient { async terminateOrchestration(instanceId: string, output: any = null): Promise { const req = new pb.TerminateRequest(); req.setInstanceid(instanceId); - req.setOutput(JSON.stringify(output)); + + const i = new StringValue(); + i.setValue(JSON.stringify(output)); + + req.setOutput(i); console.log(`Terminating '${instanceId}'`); diff --git a/src/orchestration/index.ts b/src/orchestration/index.ts index 086de16..038e392 100644 --- a/src/orchestration/index.ts +++ b/src/orchestration/index.ts @@ -21,21 +21,36 @@ export function newOrchestrationState( failureDetails = new FailureDetails( state.getFailuredetails()?.getErrormessage() ?? "", state.getFailuredetails()?.getErrortype() ?? "", - state.getFailuredetails()?.getStacktrace(), + state.getFailuredetails()?.getStacktrace()?.toString(), ); } const status = OrchestrationStatus[state?.getOrchestrationstatus() ?? 0]; + // Convert Timestamp seconds and nanos to Date + const tsCreated = state?.getCreatedtimestamp(); + const tsUpdated = state?.getLastupdatedtimestamp(); + + let tsCreatedParsed = new Date(); + let tsUpdatedParsed = new Date(); + + if (tsCreated) { + tsCreatedParsed = new Date(tsCreated.getSeconds() * 1000 + tsCreated.getNanos() / 1000000); + } + + if (tsUpdated) { + tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000); + } + return new OrchestrationState( instanceId, state?.getName() ?? "", parseGrpcValue(state?.getOrchestrationstatus() ?? 0), - new Date(state?.getCreatedtimestamp()), - new Date(state?.getLastupdatedtimestamp()), - state?.getInput() ?? null, - state?.getOutput() ?? null, - state?.getCustomstatus() ?? null, + new Date(tsCreatedParsed), + new Date(tsUpdatedParsed), + state?.getInput()?.toString(), + state?.getOutput()?.toString(), + state?.getCustomstatus()?.toString(), failureDetails, ); } diff --git a/src/worker/activity-executor.ts b/src/worker/activity-executor.ts index 947e3c8..ec53c81 100644 --- a/src/worker/activity-executor.ts +++ b/src/worker/activity-executor.ts @@ -3,30 +3,30 @@ import { ActivityNotRegisteredError } from "./exception/activity-not-registered- import { Registry } from "./registry"; export class ActivityExecutor { - private _registry: Registry; + private _registry: Registry; - constructor(registry: Registry) { - this._registry = registry; - } + constructor(registry: Registry) { + this._registry = registry; + } - public execute(orchestrationId: string, name: string, taskId: number, encodedInput?: string): string | undefined { - const fn = this._registry.getActivity(name); + public execute(orchestrationId: string, name: string, taskId: number, encodedInput?: string): string | undefined { + const fn = this._registry.getActivity(name); - if (!fn) { - throw new ActivityNotRegisteredError(`Activity function ${name} is not registered`); - } + if (!fn) { + throw new ActivityNotRegisteredError(`Activity function ${name} is not registered`); + } - const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined; - const ctx = new ActivityContext(orchestrationId, taskId); + const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined; + const ctx = new ActivityContext(orchestrationId, taskId); - // Execute the activity function - const activityOutput = fn(ctx, activityInput); + // Execute the activity function + const activityOutput = fn(ctx, activityInput); - // Return the output - const encodedOutput = activityOutput ? JSON.stringify(activityOutput) : undefined; - const chars = encodedOutput ? encodedOutput.length : 0; - console.log(`Activity ${name} completed with output ${encodedOutput} (${chars} chars)`); + // Return the output + const encodedOutput = activityOutput ? JSON.stringify(activityOutput) : undefined; + const chars = encodedOutput ? encodedOutput.length : 0; + console.log(`Activity ${name} completed with output ${encodedOutput} (${chars} chars)`); - return encodedOutput; - } -} \ No newline at end of file + return encodedOutput; + } +} diff --git a/src/worker/task-hub-grpc-worker.ts b/src/worker/task-hub-grpc-worker.ts index 1e400d7..0e49523 100644 --- a/src/worker/task-hub-grpc-worker.ts +++ b/src/worker/task-hub-grpc-worker.ts @@ -10,6 +10,9 @@ import { GrpcClient } from "../client-grpc"; import { promisify } from "util"; import { Empty } from "google-protobuf/google/protobuf/empty_pb"; import * as pbh from "../utils/pb-helper.util"; +import { OrchestrationExecutor } from "./orchestration-executor"; +import { ActivityExecutor } from "./activity-executor"; +import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; export class TaskHubGrpcWorker { private _responseStream: grpc.ClientReadableStream | null; @@ -137,7 +140,11 @@ export class TaskHubGrpcWorker { const failureDetails = pbh.newFailureDetails(e); const actions = [ - pbh.newCompleteOrchestrationAction(-1, pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, failureDetails), + pbh.newCompleteOrchestrationAction( + -1, + pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, + failureDetails?.toString(), + ), ]; res = new pb.OrchestratorResponse(); @@ -167,12 +174,15 @@ export class TaskHubGrpcWorker { try { const executor = new ActivityExecutor(this._registry); - const result = await executor.execute(req.getName(), req.getInput()); + const result = await executor.execute(req.getName(), req.getInput()?.toString() ?? "", req.getTaskid()); + + const s = new StringValue(); + s.setValue(result ?? ""); res = new pb.ActivityResponse(); res.setInstanceid(instanceId); res.setTaskid(req.getTaskid()); - res.setResult(result); + res.setResult(s); } catch (e: any) { console.error(e); console.log(`An error occurred while trying to execute activity '${req.getName()}': ${e.message}`); diff --git a/test/e2e/orchestration.spec.ts b/test/e2e/orchestration.spec.ts index ad09e49..e5adab4 100644 --- a/test/e2e/orchestration.spec.ts +++ b/test/e2e/orchestration.spec.ts @@ -1,3 +1,4 @@ +import { spawn } from "child_process"; import { TaskHubGrpcClient } from "../../src/client"; import { OrchestrationStatus } from "../../src/proto/orchestrator_service_pb"; import { getName, whenAll, whenAny } from "../../src/task"; @@ -10,16 +11,12 @@ describe("Durable Functions", () => { let taskHubClient: TaskHubGrpcClient; let taskHubWorker: TaskHubGrpcWorker; - beforeAll(async () => { - // Ensure the sidecar process is running - // docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator - // TODO: - }); + beforeAll(async () => {}); beforeEach(async () => { // Start a worker, which will connect to the sidecar in a background thread taskHubWorker = new TaskHubGrpcWorker(); - taskHubClient = new TaskHubGrpcClient(); + taskHubClient = new TaskHubGrpcClient("localhost:4001"); }); afterEach(async () => { From 39c8d62efc7e20f709b3ea08bd44e6032b6e4e49 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Fri, 4 Aug 2023 18:51:35 -0700 Subject: [PATCH 4/6] add test e2e script Signed-off-by: Xavier Geerinck --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 81ced0a..b43ef2b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -23,4 +23,4 @@ jobs: run: npm test test/unit - name: ✅ Run e2e tests - run: npm test test/e2e + run: ./scripts/test-e2e.sh From 41aa62d382cf4774b1a4a52655da0823d2594489 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Fri, 4 Aug 2023 18:53:01 -0700 Subject: [PATCH 5/6] ensure test fails if script fails Signed-off-by: Xavier Geerinck --- scripts/test-e2e.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scripts/test-e2e.sh b/scripts/test-e2e.sh index 667f372..7fe8328 100755 --- a/scripts/test-e2e.sh +++ b/scripts/test-e2e.sh @@ -10,5 +10,11 @@ docker run \ echo "Running E2E tests" npm run test test/e2e +# It should fail if the npm run fails +if [ $? -ne 0 ]; then + echo "E2E tests failed" + exit 1 +fi + echo "Stopping Sidecar" docker stop durabletask-sidecar \ No newline at end of file From e90d6c660592efedc6e380de027746532fa27009 Mon Sep 17 00:00:00 2001 From: Xavier Geerinck Date: Mon, 14 Aug 2023 08:34:33 +0200 Subject: [PATCH 6/6] misc changes Signed-off-by: Xavier Geerinck --- src/worker/task-hub-grpc-worker.ts | 2 +- test/e2e/orchestration.spec.ts | 462 +++++++++++++++-------------- 2 files changed, 233 insertions(+), 231 deletions(-) diff --git a/src/worker/task-hub-grpc-worker.ts b/src/worker/task-hub-grpc-worker.ts index 0e49523..356af37 100644 --- a/src/worker/task-hub-grpc-worker.ts +++ b/src/worker/task-hub-grpc-worker.ts @@ -66,8 +66,8 @@ export class TaskHubGrpcWorker { throw new Error("The worker is already running."); } - // send a "Hello" message to the sidecar to ensure that it's listening const stubHello = promisify(stub.hello); + console.log(stubHello); await stubHello(new Empty()); // Open a stream to get the work items diff --git a/test/e2e/orchestration.spec.ts b/test/e2e/orchestration.spec.ts index e5adab4..7750cf0 100644 --- a/test/e2e/orchestration.spec.ts +++ b/test/e2e/orchestration.spec.ts @@ -51,237 +51,239 @@ describe("Durable Functions", () => { expect(state?.serializedInput).toBeUndefined(); expect(state?.serializedOutput).toBeUndefined(); expect(state?.serializedCustomStatus).toBeUndefined(); - }); - - it("should be able to run an activity sequence", async () => { - const plusOne = async (_: ActivityContext, input: number) => { - return input + 1; - }; - - const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any { - const numbers = [startVal]; - let current = startVal; - - for (let i = 0; i < 10; i++) { - current = yield ctx.callActivity(plusOne, current); - numbers.push(current); - } - - return numbers; - }; - - taskHubWorker.addOrchestrator(sequence); - taskHubWorker.addActivity(plusOne); - await taskHubWorker.start(); - - const id = await taskHubClient.scheduleNewOrchestration(sequence, 1); - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - - expect(state); - expect(state?.name).toEqual(getName(sequence)); - expect(state?.instanceId).toEqual(id); - expect(state?.failureDetails).toBeUndefined(); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedInput).toEqual(JSON.stringify(1)); - expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])); - expect(state?.serializedCustomStatus).toBeUndefined(); - }); - - it("should be able to use the sub-orchestration for fan-out", async () => { - let activityCounter = 0; - - const increment = (ctx: ActivityContext, _: any) => { - activityCounter++; - }; - - const orchestratorChild: TOrchestrator = async function* (ctx: OrchestrationContext, activityCount: number): any { - for (let i = 0; i < activityCount; i++) { - yield ctx.callActivity(increment); - } - }; - - const orchestratorParent: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any { - // Fan out to multiple sub-orchestrations - const tasks = []; - - for (let i = 0; i < count; i++) { - tasks.push(ctx.callSubOrchestrator(orchestratorChild, 3)); - } - - // Wait for all the sub-orchestrations to complete - yield whenAll(tasks); - }; - - taskHubWorker.addActivity(increment); - taskHubWorker.addOrchestrator(orchestratorChild); - taskHubWorker.addOrchestrator(orchestratorParent); - await taskHubWorker.start(); - - const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, 10); - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.failureDetails).toBeUndefined(); - expect(activityCounter).toEqual(30); - }); - - it("should allow waiting for multiple extenral events", async () => { - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const a = yield ctx.waitForExternalEvent("A"); - const b = yield ctx.waitForExternalEvent("B"); - const c = yield ctx.waitForExternalEvent("C"); - return [a, b, c]; - }; - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); - - // Send events to the client immediately - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - taskHubClient.raiseOrchestrationEvent(id, "A", "a"); - taskHubClient.raiseOrchestrationEvent(id, "B", "b"); - taskHubClient.raiseOrchestrationEvent(id, "C", "c"); - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedOutput).toEqual(JSON.stringify(["a", "b", "c"])); - }); - - it("should wait for external events with a timeout", async () => { - for (const shouldRaiseEvent of [true, false]) { - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const approval = ctx.waitForExternalEvent("Approval"); - const timeout = ctx.createTimer(3 * 1000); - const winner = yield whenAny([approval, timeout]); - - if (winner == approval) { - return "approved"; - } else { - return "timed out"; - } - }; - - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); - - // Send events to the client immediately - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - - if (shouldRaiseEvent) { - taskHubClient.raiseOrchestrationEvent(id, "Approval"); - } - - const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - - if (shouldRaiseEvent) { - expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); - } else { - expect(state?.serializedOutput).toEqual(JSON.stringify("timed out")); - } - } - }); - - it("should be able to use suspend and resume", async () => { - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const res = yield ctx.waitForExternalEvent("my_event"); - return res; - }; - - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); - - // Send events to the client immediately - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - let state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - expect(state); - - // Suspend the orchestration and wait for it to go into the SUSPENDED state - await taskHubClient.suspendOrchestration(id); - - // TODO: is this needed in JS? We use a promise above - // while (state?.runtimeStatus == OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING) { - // await new Promise((resolve) => setTimeout(resolve, 100)); - // state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - // expect(state); - // } - - // Raise an event to the orchestration and confirm that it does NOT complete - taskHubClient.raiseOrchestrationEvent(id, "my_event", 42); - - try { - state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 3); - // TODO - // assert False, "Orchestration should not have been completed" - } catch (e) { - // pass - } - - // Resume the orchestration and wait for it to complete - taskHubClient.resumeOrchestration(id); - state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedOutput).toEqual(JSON.stringify(42)); - }); - - it("should be able to terminate an orchestration", async () => { - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { - const res = yield ctx.waitForExternalEvent("my_event"); - return res; - }; - - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); - - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - let state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING); - - taskHubClient.terminateOrchestration(id, "some reason for termination"); - state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); - expect(state?.serializedOutput).toEqual(JSON.stringify("some reason for termination")); + console.log("T"); }); - it("should allow to continue as new", async () => { - const allResults: any[] = []; - - const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { - const res = yield ctx.waitForExternalEvent("my_event"); - - if (!ctx.isReplaying) { - allResults.push(res); - } - - if (allResults.length <= 4) { - ctx.continueAsNew(Math.max(...allResults), true); - } else { - return allResults; - } - }; - - taskHubWorker.addOrchestrator(orchestrator); - await taskHubWorker.start(); - - const id = await taskHubClient.scheduleNewOrchestration(orchestrator); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 1); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 2); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 3); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 4); - taskHubClient.raiseOrchestrationEvent(id, "my_event", 5); - - const state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); - expect(state); - expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - expect(state?.serializedInput).toEqual(JSON.stringify(4)); - expect(state?.serializedOutput).toEqual(JSON.stringify(allResults)); - expect(allResults).toEqual([1, 2, 3, 4, 5]); - }); + // it("should be able to run an activity sequence", async () => { + // const plusOne = async (_: ActivityContext, input: number) => { + // return input + 1; + // }; + + // const sequence: TOrchestrator = async function* (ctx: OrchestrationContext, startVal: number): any { + // const numbers = [startVal]; + // let current = startVal; + + // for (let i = 0; i < 10; i++) { + // current = yield ctx.callActivity(plusOne, current); + // numbers.push(current); + // } + + // return numbers; + // }; + + // taskHubWorker.addOrchestrator(sequence); + // taskHubWorker.addActivity(plusOne); + // await taskHubWorker.start(); + + // const id = await taskHubClient.scheduleNewOrchestration(sequence, 1); + // const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // expect(state); + // expect(state?.name).toEqual(getName(sequence)); + // expect(state?.instanceId).toEqual(id); + // expect(state?.failureDetails).toBeUndefined(); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // expect(state?.serializedInput).toEqual(JSON.stringify(1)); + // expect(state?.serializedOutput).toEqual(JSON.stringify([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])); + // expect(state?.serializedCustomStatus).toBeUndefined(); + // }); + + // it("should be able to use the sub-orchestration for fan-out", async () => { + // let activityCounter = 0; + + // const increment = (ctx: ActivityContext, _: any) => { + // activityCounter++; + // }; + + // const orchestratorChild: TOrchestrator = async function* (ctx: OrchestrationContext, activityCount: number): any { + // for (let i = 0; i < activityCount; i++) { + // yield ctx.callActivity(increment); + // } + // }; + + // const orchestratorParent: TOrchestrator = async function* (ctx: OrchestrationContext, count: number): any { + // // Fan out to multiple sub-orchestrations + // const tasks = []; + + // for (let i = 0; i < count; i++) { + // tasks.push(ctx.callSubOrchestrator(orchestratorChild, 3)); + // } + + // // Wait for all the sub-orchestrations to complete + // yield whenAll(tasks); + // }; + + // taskHubWorker.addActivity(increment); + // taskHubWorker.addOrchestrator(orchestratorChild); + // taskHubWorker.addOrchestrator(orchestratorParent); + // await taskHubWorker.start(); + + // const id = await taskHubClient.scheduleNewOrchestration(orchestratorParent, 10); + // const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // expect(state?.failureDetails).toBeUndefined(); + // expect(activityCounter).toEqual(30); + // }); + + // it("should allow waiting for multiple extenral events", async () => { + // const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + // const a = yield ctx.waitForExternalEvent("A"); + // const b = yield ctx.waitForExternalEvent("B"); + // const c = yield ctx.waitForExternalEvent("C"); + // return [a, b, c]; + // }; + + // taskHubWorker.addOrchestrator(orchestrator); + // await taskHubWorker.start(); + + // // Send events to the client immediately + // const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + // taskHubClient.raiseOrchestrationEvent(id, "A", "a"); + // taskHubClient.raiseOrchestrationEvent(id, "B", "b"); + // taskHubClient.raiseOrchestrationEvent(id, "C", "c"); + // const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // expect(state?.serializedOutput).toEqual(JSON.stringify(["a", "b", "c"])); + // }); + + // it("should wait for external events with a timeout", async () => { + // for (const shouldRaiseEvent of [true, false]) { + // const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + // const approval = ctx.waitForExternalEvent("Approval"); + // const timeout = ctx.createTimer(3 * 1000); + // const winner = yield whenAny([approval, timeout]); + + // if (winner == approval) { + // return "approved"; + // } else { + // return "timed out"; + // } + // }; + + // taskHubWorker.addOrchestrator(orchestrator); + // await taskHubWorker.start(); + + // // Send events to the client immediately + // const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + + // if (shouldRaiseEvent) { + // taskHubClient.raiseOrchestrationEvent(id, "Approval"); + // } + + // const state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + // if (shouldRaiseEvent) { + // expect(state?.serializedOutput).toEqual(JSON.stringify("approved")); + // } else { + // expect(state?.serializedOutput).toEqual(JSON.stringify("timed out")); + // } + // } + // }); + + // it("should be able to use suspend and resume", async () => { + // const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + // const res = yield ctx.waitForExternalEvent("my_event"); + // return res; + // }; + + // taskHubWorker.addOrchestrator(orchestrator); + // await taskHubWorker.start(); + + // // Send events to the client immediately + // const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + // let state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + // expect(state); + + // // Suspend the orchestration and wait for it to go into the SUSPENDED state + // await taskHubClient.suspendOrchestration(id); + + // // TODO: is this needed in JS? We use a promise above + // // while (state?.runtimeStatus == OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING) { + // // await new Promise((resolve) => setTimeout(resolve, 100)); + // // state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + // // expect(state); + // // } + + // // Raise an event to the orchestration and confirm that it does NOT complete + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 42); + + // try { + // state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 3); + // // TODO + // // assert False, "Orchestration should not have been completed" + // } catch (e) { + // // pass + // } + + // // Resume the orchestration and wait for it to complete + // taskHubClient.resumeOrchestration(id); + // state = await taskHubClient.waitForOrchestrationCompletion(id, undefined, 30); + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + // }); + + // it("should be able to terminate an orchestration", async () => { + // const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, _: any): any { + // const res = yield ctx.waitForExternalEvent("my_event"); + // return res; + // }; + + // taskHubWorker.addOrchestrator(orchestrator); + // await taskHubWorker.start(); + + // const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + // let state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING); + + // taskHubClient.terminateOrchestration(id, "some reason for termination"); + // state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + // expect(state?.serializedOutput).toEqual(JSON.stringify("some reason for termination")); + // }); + + // it("should allow to continue as new", async () => { + // const allResults: any[] = []; + + // const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext, input: number): any { + // const res = yield ctx.waitForExternalEvent("my_event"); + + // if (!ctx.isReplaying) { + // allResults.push(res); + // } + + // if (allResults.length <= 4) { + // ctx.continueAsNew(Math.max(...allResults), true); + // } else { + // return allResults; + // } + // }; + + // taskHubWorker.addOrchestrator(orchestrator); + // await taskHubWorker.start(); + + // const id = await taskHubClient.scheduleNewOrchestration(orchestrator); + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 1); + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 2); + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 3); + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 4); + // taskHubClient.raiseOrchestrationEvent(id, "my_event", 5); + + // const state = await taskHubClient.waitForOrchestrationStart(id, undefined, 30); + // expect(state); + // expect(state?.runtimeStatus).toEqual(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + // expect(state?.serializedInput).toEqual(JSON.stringify(4)); + // expect(state?.serializedOutput).toEqual(JSON.stringify(allResults)); + // expect(allResults).toEqual([1, 2, 3, 4, 5]); + // }); });