-
Notifications
You must be signed in to change notification settings - Fork 83
/
node-universal-handler.ts
258 lines (246 loc) · 8.57 KB
/
node-universal-handler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Copyright 2021-2024 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import type * as http from "http";
import type * as http2 from "http2";
import type * as stream from "stream";
import type { JsonValue } from "@bufbuild/protobuf";
import { Code, ConnectError } from "@connectrpc/connect";
import type {
UniversalServerRequest,
UniversalServerResponse,
} from "@connectrpc/connect/protocol";
import {
nodeHeaderToWebHeader,
webHeaderToNodeHeaders,
} from "./node-universal-header.js";
import {
connectErrorFromH2ResetCode,
connectErrorFromNodeReason,
} from "./node-error.js";
import type { ContextValues } from "@connectrpc/connect";
/**
* NodeHandlerFn is compatible with http.RequestListener and its equivalent
* for http2.
*/
export type NodeHandlerFn = (
request: NodeServerRequest,
response: NodeServerResponse,
) => void;
/**
* A Node.js server request from the http, https, or the http2 module.
*/
export type NodeServerRequest = http.IncomingMessage | http2.Http2ServerRequest;
/**
* A Node.js server response from the http, https, or the http2 module.
* Note that we are taking the liberty to patch the type of write() so
* that they are compatible with each other.
*/
export type NodeServerResponse = (
| Omit<http.ServerResponse, "write">
| Omit<http2.Http2ServerResponse, "write">
) & {
write(
chunk: string | Uint8Array,
callback?: (err: Error | null | undefined) => void,
): boolean;
write(
chunk: string | Uint8Array,
encoding: BufferEncoding,
callback?: (err: Error | null | undefined) => void,
): boolean;
};
/**
* Converts a UniversalServerRequest to a Node.js server request.
* This function helps to implement adapters to server frameworks running
* on Node.js. Please be careful using this function in your own code, as we
* may have to make changes to it in the future.
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest {
const encrypted =
"encrypted" in nodeRequest.socket && nodeRequest.socket.encrypted;
const protocol = encrypted ? "https" : "http";
const authority =
"authority" in nodeRequest
? nodeRequest.authority
: nodeRequest.headers.host;
const pathname = nodeRequest.url ?? "";
if (authority === undefined) {
throw new ConnectError(
"unable to determine request authority from Node.js server request",
Code.Internal,
);
}
const body =
parsedJsonBody !== undefined
? parsedJsonBody
: asyncIterableFromNodeServerRequest(nodeRequest);
const abortController = new AbortController();
if ("stream" in nodeRequest) {
// HTTP/2 has error codes we want to honor
nodeRequest.once("close", () => {
const err = connectErrorFromH2ResetCode(nodeRequest.stream.rstCode);
if (err !== undefined) {
abortController.abort(err);
} else {
abortController.abort();
}
});
} else {
// HTTP/1.1 does not have error codes, but Node.js has ECONNRESET
const onH1Error = (e: Error) => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
abortController.abort(connectErrorFromNodeReason(e));
};
const onH1Close = () => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
abortController.abort();
};
nodeRequest.once("error", onH1Error);
nodeRequest.once("close", onH1Close);
}
return {
httpVersion: nodeRequest.httpVersion,
method: nodeRequest.method ?? "",
url: new URL(pathname, `${protocol}://${authority}`).toString(),
header: nodeHeaderToWebHeader(nodeRequest.headers),
body,
signal: abortController.signal,
contextValues: contextValues,
};
}
/**
* Writes a UniversalServerResponse to a Node.js server response.
* This function helps to implement adapters to server frameworks running
* on Node.js. Please be careful using this function in your own code, as we
* may have to make changes to it in the future.
*/
export async function universalResponseToNodeResponse(
universalResponse: UniversalServerResponse,
nodeResponse: NodeServerResponse,
): Promise<void> {
const it = universalResponse.body?.[Symbol.asyncIterator]();
let isWriteError = false;
try {
if (it !== undefined) {
let chunk = await it.next();
isWriteError = true;
// we deliberately send headers after first read, not before,
// because we have to give the implementation a chance to
// set response headers
nodeResponse.writeHead(
universalResponse.status,
webHeaderToNodeHeaders(universalResponse.header),
);
isWriteError = false;
for (; chunk.done !== true; chunk = await it.next()) {
isWriteError = true;
await write(nodeResponse, chunk.value);
if (
"flush" in nodeResponse &&
typeof nodeResponse.flush == "function"
) {
// The npm package "compression" is an express middleware that is widely used,
// for example in next.js. It uses the npm package "compressible" to determine
// whether to apply compression to a response. Unfortunately, "compressible"
// matches every mime type that ends with "+json", causing our server-streaming
// RPCs to be buffered.
// The package modifies the response object, and adds a flush() method, which
// flushes the underlying gzip or deflate stream from the Node.js zlib module.
// The method is added here:
// https://github.com/expressjs/compression/blob/ad5113b98cafe1382a0ece30bb4673707ac59ce7/index.js#L70
nodeResponse.flush();
}
isWriteError = false;
}
}
if (!nodeResponse.headersSent) {
nodeResponse.writeHead(
universalResponse.status,
webHeaderToNodeHeaders(universalResponse.header),
);
}
if (universalResponse.trailer) {
nodeResponse.addTrailers(
webHeaderToNodeHeaders(universalResponse.trailer),
);
}
await new Promise<void>((resolve) => {
// The npm package "compression" crashes when a callback is passed to end()
// https://github.com/expressjs/compression/blob/ad5113b98cafe1382a0ece30bb4673707ac59ce7/index.js#L115
nodeResponse.once("end", resolve);
nodeResponse.end();
});
} catch (e) {
// Report write errors to the handler.
if (isWriteError) {
it?.throw?.(e).catch(() => {});
}
throw connectErrorFromNodeReason(e);
} finally {
it?.return?.().catch(() => {});
}
}
async function* asyncIterableFromNodeServerRequest(
request: NodeServerRequest,
): AsyncIterable<Uint8Array> {
for await (const chunk of request) {
yield chunk;
}
}
function write(stream: stream.Writable, data: Uint8Array): Promise<void> {
return new Promise<void>((resolve, reject) => {
if (stream.errored) {
return error(stream.errored);
}
stream.once("error", error);
stream.once("drain", drain);
// flushed == false: the stream wishes for the calling code to wait for
// the 'drain' event to be emitted before continuing to write additional
// data.
const flushed = stream.write(data, "binary", function (err) {
if (err && !flushed) {
// We are never getting a "drain" nor an "error" event if the stream
// has already ended (ERR_STREAM_WRITE_AFTER_END), so we have to
// resolve our promise in this callback.
error(err);
// However, once we do that (and remove our event listeners), we _do_
// get an "error" event, which ends up as an uncaught exception.
// We silence this error specifically with the following listener.
// All of this seems very fragile.
stream.once("error", () => {
//
});
}
});
if (flushed) {
drain();
}
function error(err: Error) {
stream.off("error", error);
stream.off("drain", drain);
reject(err);
}
function drain() {
stream.off("error", error);
stream.off("drain", drain);
resolve();
}
});
}