Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add testing workflow #1

Merged
merged 6 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: 🚀 Test and Build

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
build:
runs-on: ubuntu-latest

steps:
- name: 📥 Checkout code
uses: actions/checkout@v2

- name: ⚙️ Install dependencies
run: npm install

- name: ✅ Run unit tests
run: npm test test/unit

- name: ✅ Run e2e tests
run: ./scripts/test-e2e.sh
20 changes: 20 additions & 0 deletions scripts/test-e2e.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
echo "Starting Sidecar"
docker run \
--name durabletask-sidecar -d --rm \
-p 4001:4001 \
--env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' \
cgillum/durabletask-sidecar:latest start \
--backend Emulator

echo "Running E2E tests"
npm run test test/e2e

# It should fail if the npm run fails
if [ $? -ne 0 ]; then
echo "E2E tests failed"
exit 1
fi

echo "Stopping Sidecar"
docker stop durabletask-sidecar
27 changes: 22 additions & 5 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";
import { Timestamp } from "google-protobuf/google/protobuf/timestamp_pb";
import * as pb from "./proto/orchestrator_service_pb";
import * as stubs from "./proto/orchestrator_service_grpc_pb";
import { TOrchestrator } from "./types/orchestrator.type";
Expand All @@ -18,7 +20,7 @@ export class TaskHubGrpcClient {
}

async scheduleNewOrchestration(
orchestrator: TOrchestrator<TInput, TOutput>,
orchestrator: TOrchestrator,
input?: TInput,
instanceId?: string,
startAt?: Date,
Expand All @@ -28,8 +30,15 @@ export class TaskHubGrpcClient {
const req = new pb.CreateInstanceRequest();
req.setName(name);
req.setInstanceid(instanceId ?? randomUUID());
req.setInput(input ? JSON.stringify(input) : undefined);
req.setScheduledstarttimestamp(startAt?.getTime());

const i = new StringValue();
i.setValue(JSON.stringify(input));

const ts = new Timestamp();
ts.fromDate(new Date(startAt?.getTime() ?? 0));

req.setInput(i);
req.setScheduledstarttimestamp(ts);

console.log(`Starting new ${name} instance with ID = ${req.getInstanceid()}`);

Expand Down Expand Up @@ -101,7 +110,11 @@ export class TaskHubGrpcClient {
const req = new pb.RaiseEventRequest();
req.setInstanceid(instanceId);
req.setName(eventName);
req.setInput(JSON.stringify(data));

const i = new StringValue();
i.setValue(JSON.stringify(data));

req.setInput(i);

console.log(`Raising event '${eventName}' for instance '${instanceId}'`);

Expand All @@ -112,7 +125,11 @@ export class TaskHubGrpcClient {
async terminateOrchestration(instanceId: string, output: any = null): Promise<void> {
const req = new pb.TerminateRequest();
req.setInstanceid(instanceId);
req.setOutput(JSON.stringify(output));

const i = new StringValue();
i.setValue(JSON.stringify(output));

req.setOutput(i);

console.log(`Terminating '${instanceId}'`);

Expand Down
27 changes: 21 additions & 6 deletions src/orchestration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,36 @@ export function newOrchestrationState(
failureDetails = new FailureDetails(
state.getFailuredetails()?.getErrormessage() ?? "",
state.getFailuredetails()?.getErrortype() ?? "",
state.getFailuredetails()?.getStacktrace(),
state.getFailuredetails()?.getStacktrace()?.toString(),
);
}

const status = OrchestrationStatus[state?.getOrchestrationstatus() ?? 0];

// Convert Timestamp seconds and nanos to Date
const tsCreated = state?.getCreatedtimestamp();
const tsUpdated = state?.getLastupdatedtimestamp();

let tsCreatedParsed = new Date();
let tsUpdatedParsed = new Date();

if (tsCreated) {
tsCreatedParsed = new Date(tsCreated.getSeconds() * 1000 + tsCreated.getNanos() / 1000000);
}

if (tsUpdated) {
tsUpdatedParsed = new Date(tsUpdated.getSeconds() * 1000 + tsUpdated.getNanos() / 1000000);
}

return new OrchestrationState(
instanceId,
state?.getName() ?? "",
parseGrpcValue(state?.getOrchestrationstatus() ?? 0),
new Date(state?.getCreatedtimestamp()),
new Date(state?.getLastupdatedtimestamp()),
state?.getInput() ?? null,
state?.getOutput() ?? null,
state?.getCustomstatus() ?? null,
new Date(tsCreatedParsed),
new Date(tsUpdatedParsed),
state?.getInput()?.toString(),
state?.getOutput()?.toString(),
state?.getCustomstatus()?.toString(),
failureDetails,
);
}
40 changes: 20 additions & 20 deletions src/worker/activity-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,30 @@ import { ActivityNotRegisteredError } from "./exception/activity-not-registered-
import { Registry } from "./registry";

export class ActivityExecutor {
private _registry: Registry;
private _registry: Registry;

constructor(registry: Registry) {
this._registry = registry;
}
constructor(registry: Registry) {
this._registry = registry;
}

public execute(orchestrationId: string, name: string, taskId: number, encodedInput?: string): string | undefined {
const fn = this._registry.getActivity(name);
public execute(orchestrationId: string, name: string, taskId: number, encodedInput?: string): string | undefined {
const fn = this._registry.getActivity(name);

if (!fn) {
throw new ActivityNotRegisteredError(`Activity function ${name} is not registered`);
}
if (!fn) {
throw new ActivityNotRegisteredError(`Activity function ${name} is not registered`);
}

const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined;
const ctx = new ActivityContext(orchestrationId, taskId);
const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined;
const ctx = new ActivityContext(orchestrationId, taskId);

// Execute the activity function
const activityOutput = fn(ctx, activityInput);
// Execute the activity function
const activityOutput = fn(ctx, activityInput);

// Return the output
const encodedOutput = activityOutput ? JSON.stringify(activityOutput) : undefined;
const chars = encodedOutput ? encodedOutput.length : 0;
console.log(`Activity ${name} completed with output ${encodedOutput} (${chars} chars)`);
// Return the output
const encodedOutput = activityOutput ? JSON.stringify(activityOutput) : undefined;
const chars = encodedOutput ? encodedOutput.length : 0;
console.log(`Activity ${name} completed with output ${encodedOutput} (${chars} chars)`);

return encodedOutput;
}
}
return encodedOutput;
}
}
18 changes: 14 additions & 4 deletions src/worker/task-hub-grpc-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { GrpcClient } from "../client-grpc";
import { promisify } from "util";
import { Empty } from "google-protobuf/google/protobuf/empty_pb";
import * as pbh from "../utils/pb-helper.util";
import { OrchestrationExecutor } from "./orchestration-executor";
import { ActivityExecutor } from "./activity-executor";
import { StringValue } from "google-protobuf/google/protobuf/wrappers_pb";

export class TaskHubGrpcWorker {
private _responseStream: grpc.ClientReadableStream<pb.WorkItem> | null;
Expand Down Expand Up @@ -63,8 +66,8 @@ export class TaskHubGrpcWorker {
throw new Error("The worker is already running.");
}

// send a "Hello" message to the sidecar to ensure that it's listening
const stubHello = promisify(stub.hello);
console.log(stubHello);
await stubHello(new Empty());

// Open a stream to get the work items
Expand Down Expand Up @@ -137,7 +140,11 @@ export class TaskHubGrpcWorker {
const failureDetails = pbh.newFailureDetails(e);

const actions = [
pbh.newCompleteOrchestrationAction(-1, pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED, failureDetails),
pbh.newCompleteOrchestrationAction(
-1,
pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED,
failureDetails?.toString(),
),
];

res = new pb.OrchestratorResponse();
Expand Down Expand Up @@ -167,12 +174,15 @@ export class TaskHubGrpcWorker {

try {
const executor = new ActivityExecutor(this._registry);
const result = await executor.execute(req.getName(), req.getInput());
const result = await executor.execute(req.getName(), req.getInput()?.toString() ?? "", req.getTaskid());

const s = new StringValue();
s.setValue(result ?? "");

res = new pb.ActivityResponse();
res.setInstanceid(instanceId);
res.setTaskid(req.getTaskid());
res.setResult(result);
res.setResult(s);
} catch (e: any) {
console.error(e);
console.log(`An error occurred while trying to execute activity '${req.getName()}': ${e.message}`);
Expand Down
Loading