-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathserver.ts
328 lines (302 loc) · 13.5 KB
/
server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
import Koa from 'koa';
import Router from '@koa/router';
import { bodyParser } from '@koa/bodyparser';
import cors from "@koa/cors";
import {
RequestIDHeader,
HandlerContextImpl,
HandlerRegistration,
} from "./handler";
import { ArgSources, APITypes } from "./handlerTypes";
import { Transaction } from "../transaction";
import { Workflow } from "../workflow";
import {
DBOSDataValidationError,
DBOSError,
DBOSResponseError,
isClientError,
} from "../error";
import { DBOSExecutor } from "../dbos-executor";
import { GlobalLogger as Logger } from "../telemetry/logs";
import { MiddlewareDefaults } from './middleware';
import { SpanStatusCode, trace, ROOT_CONTEXT } from '@opentelemetry/api';
import { Communicator } from '../communicator';
import * as net from 'net';
import { performance } from 'perf_hooks';
export const WorkflowUUIDHeader = "dbos-idempotency-key";
export const WorkflowRecoveryUrl = "/dbos-workflow-recovery"
export const HealthUrl = "/dbos-healthz"
export const PerfUrl = "/dbos-perf"
export class DBOSHttpServer {
readonly app: Koa;
readonly adminApp: Koa;
readonly applicationRouter: Router;
readonly adminRouter: Router;
readonly logger: Logger;
/**
* Create a Koa app.
* @param dbosExec User pass in an DBOS workflow executor instance.
* TODO: maybe call dbosExec.init() somewhere in this class?
*/
constructor(readonly dbosExec: DBOSExecutor) {
this.applicationRouter = new Router();
this.adminRouter = new Router();
this.logger = dbosExec.logger;
this.app = new Koa();
this.app.use(bodyParser());
this.app.use(cors());
this.adminApp = new Koa();
this.adminApp.use(bodyParser());
this.adminApp.use(cors());
// Register HTTP endpoints.
DBOSHttpServer.registerHealthEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerRecoveryEndpoint(this.dbosExec, this.adminRouter);
DBOSHttpServer.registerPerfEndpoint(this.dbosExec, this.adminRouter);
this.adminApp.use(this.adminRouter.routes()).use(this.adminRouter.allowedMethods());
DBOSHttpServer.registerDecoratedEndpoints(this.dbosExec, this.applicationRouter);
this.app.use(this.applicationRouter.routes()).use(this.applicationRouter.allowedMethods());
}
/**
* Register HTTP endpoints and attach to the app. Then start the server at the given port.
* @param port
*/
async listen(port: number) {
try {
await this.checkPortAvailability(port, "127.0.0.1");
await this.checkPortAvailability(port, "::1");
} catch (error) {
this.logger.warn(`Port ${port} is already used. Please use the -p option to choose another port.`);
process.exit(1);
}
const appServer = this.app.listen(port, () => {
this.logger.info(`DBOS Server is running at http://localhost:${port}`);
});
const adminPort = port + 1
const adminServer = this.adminApp.listen(adminPort, () => {
this.logger.info(`DBOS Admin Server is running at http://localhost:${adminPort}`);
});
return {appServer: appServer, adminServer: adminServer}
}
async checkPortAvailability(port: number, host: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
const server = new net.Server();
server.on('error', (error: NodeJS.ErrnoException) => {
if (error.code === 'EADDRINUSE') {
reject(new Error(`Port ${port} is already in use`));
} else {
reject(error);
}
});
server.on('listening', () => {
server.close();
resolve();
});
server.listen({port:port, host: host},() => {
resolve();
});
});
}
/**
* Health check endpoint.
*/
static registerHealthEndpoint(dbosExec: DBOSExecutor, router: Router) {
// Handler function that parses request for recovery.
const healthHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
koaCtxt.body = "healthy";
await koaNext();
};
router.get(HealthUrl, healthHandler);
dbosExec.logger.debug(`DBOS Server Registered Healthz POST ${HealthUrl}`);
}
/**
* Register workflow recovery endpoint.
* Receives a list of executor IDs and returns a list of workflowUUIDs.
*/
static registerRecoveryEndpoint(dbosExec: DBOSExecutor, router: Router) {
// Handler function that parses request for recovery.
const recoveryHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const executorIDs = koaCtxt.request.body as string[];
dbosExec.logger.info("Recovering workflows for executors: " + executorIDs.toString());
const recoverHandles = await dbosExec.recoverPendingWorkflows(executorIDs);
// Return a list of workflowUUIDs being recovered.
koaCtxt.body = await Promise.allSettled(recoverHandles.map((i) => i.getWorkflowUUID())).then((results) =>
results.filter((i) => i.status === "fulfilled").map((i) => (i as PromiseFulfilledResult<unknown>).value)
);
await koaNext();
};
router.post(WorkflowRecoveryUrl, recoveryHandler);
dbosExec.logger.debug(`DBOS Server Registered Recovery POST ${WorkflowRecoveryUrl}`);
}
/**
* Register performance endpoint.
* Returns information on VM performance since last call.
*/
static registerPerfEndpoint(dbosExec: DBOSExecutor, router: Router) {
let lastELU = performance.eventLoopUtilization()
const perfHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
const currELU = performance.eventLoopUtilization();
const elu = performance.eventLoopUtilization(currELU, lastELU);
koaCtxt.body = elu;
lastELU = currELU;
await koaNext();
};
router.get(PerfUrl, perfHandler);
dbosExec.logger.debug(`DBOS Server Registered Healthz POST ${HealthUrl}`);
}
/**
* Register decorated functions as HTTP endpoints.
*/
static registerDecoratedEndpoints(dbosExec: DBOSExecutor, router: Router) {
// Register user declared endpoints, wrap around the endpoint with request parsing and response.
dbosExec.registeredOperations.forEach((registeredOperation) => {
const ro = registeredOperation as HandlerRegistration<unknown, unknown[], unknown>;
if (ro.apiURL) {
// Check if we need to apply any Koa middleware.
const defaults = ro.defaults as MiddlewareDefaults;
if (defaults?.koaMiddlewares) {
defaults.koaMiddlewares.forEach((koaMiddleware) => {
dbosExec.logger.debug(`DBOS Server applying middleware ${koaMiddleware.name} to ${ro.apiURL}`);
router.use(ro.apiURL, koaMiddleware);
});
}
// Wrapper function that parses request and send response.
const wrappedHandler = async (koaCtxt: Koa.Context, koaNext: Koa.Next) => {
const oc: HandlerContextImpl = new HandlerContextImpl(dbosExec, koaCtxt);
try {
// Check for auth first
if (defaults?.authMiddleware) {
const res = await defaults.authMiddleware({
name: ro.name,
requiredRole: ro.getRequiredRoles(),
koaContext: koaCtxt,
logger: oc.logger,
span: oc.span,
getConfig: (key: string, def) => {
return oc.getConfig(key, def);
},
query: (query, ...args) => {
return dbosExec.userDatabase.queryFunction(query, ...args);
},
});
if (res) {
oc.authenticatedUser = res.authenticatedUser;
oc.authenticatedRoles = res.authenticatedRoles;
}
}
// Parse the arguments.
const args: unknown[] = [];
ro.args.forEach((marg, idx) => {
marg.argSource = marg.argSource ?? ArgSources.DEFAULT; // Assign a default value.
if (idx === 0) {
return; // Do not parse the context.
}
let foundArg = undefined;
if ((ro.apiType === APITypes.GET && marg.argSource === ArgSources.DEFAULT) || marg.argSource === ArgSources.QUERY) {
foundArg = koaCtxt.request.query[marg.name];
if (foundArg) {
args.push(foundArg);
}
} else if ((ro.apiType === APITypes.POST && marg.argSource === ArgSources.DEFAULT) || marg.argSource === ArgSources.BODY) {
if (!koaCtxt.request.body) {
throw new DBOSDataValidationError(`Argument ${marg.name} requires a method body.`);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment
foundArg = koaCtxt.request.body[marg.name];
if (foundArg) {
args.push(foundArg);
}
}
// Try to parse the argument from the URL if nothing found.
if (!foundArg) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
args.push(koaCtxt.params[marg.name]);
}
//console.log(`found arg ${marg.name} ${idx} ${args[idx-1]}`);
});
// Extract workflow UUID from headers (if any).
// We pass in the specified workflow UUID to workflows and transactions, but doesn't restrict how handlers use it.
const headerWorkflowUUID = koaCtxt.get(WorkflowUUIDHeader);
// Finally, invoke the transaction/workflow/plain function and properly set HTTP response.
// If functions return successfully and hasn't set the body, we set the body to the function return value. The status code will be automatically set to 200 or 204 (if the body is null/undefined).
// In case of an exception:
// - If a client-side error is thrown, we return 400.
// - If an error contains a `status` field, we return the specified status code.
// - Otherwise, we return 500.
const wfParams = { parentCtx: oc, workflowUUID: headerWorkflowUUID };
if (ro.txnConfig) {
koaCtxt.body = await dbosExec.transaction(ro.registeredFunction as Transaction<unknown[], unknown>, wfParams, ...args);
} else if (ro.workflowConfig) {
koaCtxt.body = await (await dbosExec.workflow(ro.registeredFunction as Workflow<unknown[], unknown>, wfParams, ...args)).getResult();
} else if (ro.commConfig) {
koaCtxt.body = await dbosExec.external(ro.registeredFunction as Communicator<unknown[], unknown>, wfParams, ...args);
} else {
// Directly invoke the handler code.
const retValue = await ro.invoke(undefined, [oc, ...args]);
// Set the body to the return value unless the body is already set by the handler.
if (koaCtxt.body === undefined) {
koaCtxt.body = retValue;
}
}
oc.span.setStatus({ code: SpanStatusCode.OK });
} catch (e) {
if (e instanceof Error) {
oc.logger.error(e);
oc.span.setStatus({ code: SpanStatusCode.ERROR, message: e.message });
let st = (e as DBOSResponseError)?.status || 500;
const dbosErrorCode = (e as DBOSError)?.dbosErrorCode;
if (dbosErrorCode && isClientError(dbosErrorCode)) {
st = 400; // Set to 400: client-side error.
}
koaCtxt.status = st;
koaCtxt.message = e.message;
koaCtxt.body = {
status: st,
message: e.message,
details: e,
};
} else {
// FIXME we should have a standard, user friendly message for errors that are not instances of Error.
// using stringify() will not produce a pretty output, because our format function uses stringify() too.
oc.logger.error(JSON.stringify(e));
oc.span.setStatus({ code: SpanStatusCode.ERROR, message: JSON.stringify(e) });
koaCtxt.body = e;
koaCtxt.status = 500;
}
} finally {
// Inject trace context into response headers.
// We cannot use the defaultTextMapSetter to set headers through Koa
// So we provide a custom setter that sets headers through Koa's context.
// See https://github.com/open-telemetry/opentelemetry-js/blob/868f75e448c7c3a0efd75d72c448269f1375a996/packages/opentelemetry-core/src/trace/W3CTraceContextPropagator.ts#L74
interface Carrier {
context: Koa.Context;
}
oc.W3CTraceContextPropagator.inject(
trace.setSpanContext(ROOT_CONTEXT, oc.span.spanContext()),
{
context: koaCtxt,
},
{
set: (carrier: Carrier, key: string, value: string) => {
carrier.context.set(key, value);
},
}
);
dbosExec.tracer.endSpan(oc.span);
// Add requestID to response headers.
koaCtxt.set(RequestIDHeader, oc.request.requestID as string);
await koaNext();
}
};
// Actually register the endpoint.
if (ro.apiType === APITypes.GET) {
router.get(ro.apiURL, wrappedHandler);
dbosExec.logger.debug(`DBOS Server Registered GET ${ro.apiURL}`);
} else if (ro.apiType === APITypes.POST) {
router.post(ro.apiURL, wrappedHandler);
dbosExec.logger.debug(`DBOS Server Registered POST ${ro.apiURL}`);
}
}
});
}
}