From 133b3901d6999a84603cf7d6e6a6b4f3f8766cc8 Mon Sep 17 00:00:00 2001 From: Satya Rohith Date: Tue, 5 Mar 2024 15:55:16 +0530 Subject: [PATCH] wip: need to get addTrailers to work first --- a.mjs | 14 + ext/node/polyfills/http2.ts | 747 ++++++++++++++++++++++++++++++++---- main.mjs | 7 + tests/unit/serve_test.ts | 2 +- 4 files changed, 685 insertions(+), 85 deletions(-) create mode 100644 a.mjs create mode 100644 main.mjs diff --git a/a.mjs b/a.mjs new file mode 100644 index 00000000000000..53adf114b9ee70 --- /dev/null +++ b/a.mjs @@ -0,0 +1,14 @@ +import http2 from "node:http2"; + +const server = http2.createServer((req, res) => { + console.log("handler called"); + console.log(req); + res.setHeader("Content-Type", "text/html"); + res.setHeader("X-Foo", "bar"); + res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" }); + res.write("Hello, World!"); + console.log(res); + res.end(); +}); + +server.listen(8000); diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 8453e44ce4263c..3ac1c152a67163 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -21,6 +21,8 @@ import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { Readable } from "node:stream"; import { EventEmitter } from "node:events"; import { Buffer } from "node:buffer"; +import { emitWarning } from "node:process"; +import Stream from "node:stream"; import { connect as netConnect, Server, Socket, TCP } from "node:net"; import { connect as tlsConnect } from "node:tls"; import { TypedArray } from "ext:deno_node/internal/util/types.ts"; @@ -42,11 +44,14 @@ import { ERR_HTTP2_CONNECT_PATH, ERR_HTTP2_CONNECT_SCHEME, ERR_HTTP2_GOAWAY_SESSION, + ERR_HTTP2_HEADERS_SENT, + ERR_HTTP2_INFO_STATUS_NOT_ALLOWED, ERR_HTTP2_INVALID_PSEUDOHEADER, ERR_HTTP2_INVALID_SESSION, ERR_HTTP2_INVALID_STREAM, ERR_HTTP2_NO_SOCKET_MANIPULATION, ERR_HTTP2_SESSION_ERROR, + ERR_HTTP2_STATUS_INVALID, ERR_HTTP2_STREAM_CANCEL, ERR_HTTP2_STREAM_ERROR, ERR_HTTP2_TOO_MANY_CUSTOM_SETTINGS, @@ -56,12 +61,19 @@ import { ERR_INVALID_ARG_VALUE, ERR_INVALID_HTTP_TOKEN, ERR_SOCKET_CLOSED, + ERR_STREAM_WRITE_AFTER_END, } from "ext:deno_node/internal/errors.ts"; import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts"; const { StringPrototypeTrim, FunctionPrototypeBind, + ObjectKeys, ReflectGetPrototypeOf, + ObjectAssign, + StringPrototypeToLowerCase, + ReflectApply, + ArrayIsArray, + ObjectPrototypeHasOwnProperty, } = primordials; const kSession = Symbol("session"); @@ -712,8 +724,11 @@ export class Http2Stream extends EventEmitter { return {}; } - sendTrailers(_headers: Record) { - addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]); + sendTrailers(headers: Record) { + console.log("sendTrailers(headers): ", headers); + + // addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]); + addTrailers(this._response, Object.entries(headers)); } } @@ -2056,9 +2071,6 @@ const kAborted = Symbol("aborted"); const kProxySocket = Symbol("proxySocket"); const kRequest = Symbol("request"); -let statusMessageWarned = false; -let statusConnectionHeaderWarned = false; - const proxySocketHandler = { has(stream, prop) { const ref = stream.session !== undefined ? stream.session[kSocket] : stream; @@ -2274,7 +2286,7 @@ class Http2ServerRequest extends Readable { // state.didRead = true; // this[kStream].on("data", onStreamData); // } else { - // process.nextTick(resumeStream, this[kStream]); + // nextTick(resumeStream, this[kStream]); // } // } @@ -2437,136 +2449,703 @@ function onStreamAbortedRequest() { // } // } -export class Http2ServerResponse { - constructor() { +function onStreamTrailersReady() { + this.sendTrailers(this[kResponse][kTrailers]); +} + +function onStreamCloseResponse() { + const res = this[kResponse]; + + if (res === undefined) { + return; } - addTrailers(_headers: Record) { - notImplemented("Http2ServerResponse.addTrailers"); + const state = res[kState]; + + if (this.headRequest !== state.headRequest) { + return; } - get connection(): Socket /*| TlsSocket*/ { - notImplemented("Http2ServerResponse.connection"); - return {}; + state.closed = true; + + this[kProxySocket] = null; + + this.removeListener("wantTrailers", onStreamTrailersReady); + this[kResponse] = undefined; + + res.emit("finish"); + res.emit("close"); +} + +function onStreamAbortedResponse() { + // non-op for now +} + +let statusMessageWarned = false; +let statusConnectionHeaderWarned = false; + +// Defines and implements an API compatibility layer on top of the core +// HTTP/2 implementation, intended to provide an interface that is as +// close as possible to the current require('http') API + +function statusMessageWarn() { + if (statusMessageWarned === false) { + emitWarning( + "Status message is not supported by HTTP/2 (RFC7540 8.1.2.4)", + "UnsupportedWarning", + ); + statusMessageWarned = true; } +} - createPushResponse( - _headers: Record, - _callback: () => unknown, - ) { - notImplemented("Http2ServerResponse.createPushResponse"); +function isConnectionHeaderAllowed(name, value) { + return name !== constants.HTTP2_HEADER_CONNECTION || + value === "trailers"; +} + +function connectionHeaderMessageWarn() { + if (statusConnectionHeaderWarned === false) { + emitWarning( + "The provided connection header is not valid, " + + "the value will be dropped from the header and " + + "will never be in use.", + "UnsupportedWarning", + ); + statusConnectionHeaderWarned = true; } +} - end( - _data: string | Buffer | Uint8Array, - _encoding: string, - _callback: () => unknown, - ) { - notImplemented("Http2ServerResponse.end"); +class Http2ServerResponse extends Stream { + writable = false; + req = null; + + constructor(stream, options) { + super(options); + this[kState] = { + closed: false, + ending: false, + destroyed: false, + headRequest: false, + sendDate: true, + statusCode: constants.HTTP_STATUS_OK, + }; + this[kHeaders] = { __proto__: null }; + this[kTrailers] = { __proto__: null }; + this[kStream] = stream; + stream[kProxySocket] = null; + stream[kResponse] = this; + this.writable = true; + this.req = stream[kRequest]; + stream.on("drain", onStreamDrain); + stream.on("aborted", onStreamAbortedResponse); + stream.on("close", onStreamCloseResponse); + stream.on("wantTrailers", onStreamTrailersReady); + stream.on("timeout", onStreamTimeout(kResponse)); } - get finished(): boolean { - notImplemented("Http2ServerResponse.finished"); - return false; + // User land modules such as finalhandler just check truthiness of this + // but if someone is actually trying to use this for more than that + // then we simply can't support such use cases + get _header() { + return this.headersSent; } - getHeader(_name: string): string { - notImplemented("Http2ServerResponse.getHeader"); - return ""; + get writableEnded() { + const state = this[kState]; + return state.ending; } - getHeaderNames(): string[] { - notImplemented("Http2ServerResponse.getHeaderNames"); - return []; + get finished() { + const state = this[kState]; + return state.ending; } - getHeaders(): Record { - notImplemented("Http2ServerResponse.getHeaders"); - return {}; + get socket() { + // This is compatible with http1 which removes socket reference + // only from ServerResponse but not IncomingMessage + if (this[kState].closed) { + return undefined; + } + + const stream = this[kStream]; + const proxySocket = stream[kProxySocket]; + if (proxySocket === null) { + return stream[kProxySocket] = new Proxy(stream, proxySocketHandler); + } + return proxySocket; } - hasHeader(_name: string) { - notImplemented("Http2ServerResponse.hasHeader"); + get connection() { + return this.socket; } - get headersSent(): boolean { - notImplemented("Http2ServerResponse.headersSent"); - return false; + get stream() { + return this[kStream]; } - removeHeader(_name: string) { - notImplemented("Http2ServerResponse.removeHeader"); + get headersSent() { + return this[kStream].headersSent; } - get req(): Http2ServerRequest { - notImplemented("Http2ServerResponse.req"); - return new Http2ServerRequest(); + get sendDate() { + return this[kState].sendDate; } - get sendDate(): boolean { - notImplemented("Http2ServerResponse.sendDate"); - return false; + set sendDate(bool) { + this[kState].sendDate = Boolean(bool); } - setHeader(_name: string, _value: string | string[]) { - notImplemented("Http2ServerResponse.setHeader"); + get statusCode() { + return this[kState].statusCode; } - setTimeout(msecs: number, callback?: () => unknown) { - this.stream.setTimeout(msecs, callback); + get writableCorked() { + return this[kStream].writableCorked; } - get socket(): Socket /*| TlsSocket*/ { - notImplemented("Http2ServerResponse.socket"); - return {}; + get writableHighWaterMark() { + return this[kStream].writableHighWaterMark; } - get statusCode(): number { - notImplemented("Http2ServerResponse.statusCode"); - return 0; + get writableFinished() { + return this[kStream].writableFinished; + } + + get writableLength() { + return this[kStream].writableLength; + } + + set statusCode(code) { + code |= 0; + if (code >= 100 && code < 200) { + throw new ERR_HTTP2_INFO_STATUS_NOT_ALLOWED(); + } + if (code < 100 || code > 599) { + throw new ERR_HTTP2_STATUS_INVALID(code); + } + this[kState].statusCode = code; + } + + setTrailer(name, value) { + // validateString(name, "name"); + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + // assertValidHeader(name, value); + this[kTrailers][name] = value; } - get statusMessage(): string { - notImplemented("Http2ServerResponse.statusMessage"); + addTrailers(headers) { + const keys = ObjectKeys(headers); + let key = ""; + for (let i = 0; i < keys.length; i++) { + key = keys[i]; + this.setTrailer(key, headers[key]); + } + } + + getHeader(name) { + // validateString(name, "name"); + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + return this[kHeaders][name]; + } + + getHeaderNames() { + return ObjectKeys(this[kHeaders]); + } + + getHeaders() { + const headers = { __proto__: null }; + return ObjectAssign(headers, this[kHeaders]); + } + + hasHeader(name) { + // validateString(name, "name"); + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + return ObjectPrototypeHasOwnProperty(this[kHeaders], name); + } + + removeHeader(name) { + // validateString(name, "name"); + if (this[kStream].headersSent) { + throw new ERR_HTTP2_HEADERS_SENT(); + } + + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + + if (name === "date") { + this[kState].sendDate = false; + + return; + } + + delete this[kHeaders][name]; + } + + setHeader(name, value) { + // validateString(name, "name"); + if (this[kStream].headersSent) { + throw new ERR_HTTP2_HEADERS_SENT(); + } + + this[kSetHeader](name, value); + } + + [kSetHeader](name, value) { + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + // assertValidHeader(name, value); + + if (!isConnectionHeaderAllowed(name, value)) { + return; + } + + if (name[0] === ":") { + assertValidPseudoHeader(name); + } else if (!_checkIsHttpToken(name)) { + this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", name)); + } + + this[kHeaders][name] = value; + } + + appendHeader(name, value) { + // validateString(name, "name"); + if (this[kStream].headersSent) { + throw new ERR_HTTP2_HEADERS_SENT(); + } + + this[kAppendHeader](name, value); + } + + [kAppendHeader](name, value) { + name = StringPrototypeToLowerCase(StringPrototypeTrim(name)); + // assertValidHeader(name, value); + + if (!isConnectionHeaderAllowed(name, value)) { + return; + } + + if (name[0] === ":") { + assertValidPseudoHeader(name); + } else if (!_checkIsHttpToken(name)) { + this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", name)); + } + + // Handle various possible cases the same as OutgoingMessage.appendHeader: + const headers = this[kHeaders]; + if (headers === null || !headers[name]) { + return this.setHeader(name, value); + } + + if (!ArrayIsArray(headers[name])) { + headers[name] = [headers[name]]; + } + + const existingValues = headers[name]; + if (ArrayIsArray(value)) { + for (let i = 0, length = value.length; i < length; i++) { + existingValues.push(value[i]); + } + } else { + existingValues.push(value); + } + } + + get statusMessage() { + statusMessageWarn(); + return ""; } - get stream(): Http2Stream { - notImplemented("Http2ServerResponse.stream"); - return new Http2Stream(); + set statusMessage(msg) { + statusMessageWarn(); } - get writableEnded(): boolean { - notImplemented("Http2ServerResponse.writableEnded"); - return false; + flushHeaders() { + const state = this[kState]; + if (!state.closed && !this[kStream].headersSent) { + this.writeHead(state.statusCode); + } } - write( - _chunk: string | Buffer | Uint8Array, - _encoding: string, - _callback: () => unknown, - ) { - notImplemented("Http2ServerResponse.write"); - return this.write; + writeHead(statusCode, statusMessage, headers) { + const state = this[kState]; + + if (state.closed || this.stream.destroyed) { + return this; + } + if (this[kStream].headersSent) { + throw new ERR_HTTP2_HEADERS_SENT(); + } + + if (typeof statusMessage === "string") { + statusMessageWarn(); + } + + if (headers === undefined && typeof statusMessage === "object") { + headers = statusMessage; + } + + let i; + if (ArrayIsArray(headers)) { + if (this[kHeaders]) { + // Headers in obj should override previous headers but still + // allow explicit duplicates. To do so, we first remove any + // existing conflicts, then use appendHeader. This is the + // slow path, which only applies when you use setHeader and + // then pass headers in writeHead too. + + // We need to handle both the tuple and flat array formats, just + // like the logic further below. + if (headers.length && ArrayIsArray(headers[0])) { + for (let n = 0; n < headers.length; n += 1) { + const key = headers[n + 0][0]; + this.removeHeader(key); + } + } else { + for (let n = 0; n < headers.length; n += 2) { + const key = headers[n + 0]; + this.removeHeader(key); + } + } + } + + // Append all the headers provided in the array: + if (headers.length && ArrayIsArray(headers[0])) { + for (i = 0; i < headers.length; i++) { + const header = headers[i]; + this[kAppendHeader](header[0], header[1]); + } + } else { + if (headers.length % 2 !== 0) { + throw new ERR_INVALID_ARG_VALUE("headers", headers); + } + + for (i = 0; i < headers.length; i += 2) { + this[kAppendHeader](headers[i], headers[i + 1]); + } + } + } else if (typeof headers === "object") { + const keys = ObjectKeys(headers); + let key = ""; + for (i = 0; i < keys.length; i++) { + key = keys[i]; + this[kSetHeader](key, headers[key]); + } + } + + state.statusCode = statusCode; + this[kBeginSend](); + + return this; } - writeContinue() { - notImplemented("Http2ServerResponse.writeContinue"); + cork() { + this[kStream].cork(); } - writeEarlyHints(_hints: Record) { - notImplemented("Http2ServerResponse.writeEarlyHints"); + uncork() { + this[kStream].uncork(); } - writeHead( - _statusCode: number, - _statusMessage: string, - _headers: Record, - ) { - notImplemented("Http2ServerResponse.writeHead"); + write(chunk, encoding, cb) { + const state = this[kState]; + + if (typeof encoding === "function") { + cb = encoding; + encoding = "utf8"; + } + + let err; + if (state.ending) { + err = new ERR_STREAM_WRITE_AFTER_END(); + } else if (state.closed) { + err = new ERR_HTTP2_INVALID_STREAM(); + } else if (state.destroyed) { + return false; + } + + if (err) { + if (typeof cb === "function") { + nextTick(cb, err); + } + this.destroy(err); + return false; + } + + const stream = this[kStream]; + if (!stream.headersSent) { + this.writeHead(state.statusCode); + } + return stream.write(chunk, encoding, cb); + } + + end(chunk, encoding, cb) { + const stream = this[kStream]; + const state = this[kState]; + + if (typeof chunk === "function") { + cb = chunk; + chunk = null; + } else if (typeof encoding === "function") { + cb = encoding; + encoding = "utf8"; + } + + if ( + (state.closed || state.ending) && + state.headRequest === stream.headRequest + ) { + if (typeof cb === "function") { + nextTick(cb); + } + return this; + } + + if (chunk !== null && chunk !== undefined) { + this.write(chunk, encoding); + } + + state.headRequest = stream.headRequest; + state.ending = true; + + if (typeof cb === "function") { + if (stream.writableEnded) { + this.once("finish", cb); + } else { + stream.once("finish", cb); + } + } + + if (!stream.headersSent) { + this.writeHead(this[kState].statusCode); + } + + if (this[kState].closed || stream.destroyed) { + ReflectApply(onStreamCloseResponse, stream, []); + } else { + stream.end(); + } + + return this; + } + + destroy(err) { + if (this[kState].destroyed) { + return; + } + + this[kState].destroyed = true; + this[kStream].destroy(err); + } + + setTimeout(msecs, callback) { + if (this[kState].closed) { + return; + } + this[kStream].setTimeout(msecs, callback); + } + + createPushResponse(headers, callback) { + // validateFunction(callback, "callback"); + if (this[kState].closed) { + nextTick(callback, new ERR_HTTP2_INVALID_STREAM()); + return; + } + this[kStream].pushStream(headers, {}, (err, stream, headers, options) => { + if (err) { + callback(err); + return; + } + callback(null, new Http2ServerResponse(stream)); + }); + } + + [kBeginSend]() { + const state = this[kState]; + const headers = this[kHeaders]; + headers[constants.HTTP2_HEADER_STATUS] = state.statusCode; + const options = { + endStream: state.ending, + waitForTrailers: true, + sendDate: state.sendDate, + }; + this[kStream].respond(headers, options); + } + + // TODO doesn't support callbacks + writeContinue() { + const stream = this[kStream]; + if (stream.headersSent || this[kState].closed) { + return false; + } + stream.additionalHeaders({ + [constants.HTTP2_HEADER_STATUS]: constants.HTTP_STATUS_CONTINUE, + }); + return true; + } + + writeEarlyHints(hints) { + // validateObject(hints, "hints"); + + const headers = { __proto__: null }; + + // const linkHeaderValue = validateLinkHeaderValue(hints.link); + + for (const key of ObjectKeys(hints)) { + if (key !== "link") { + headers[key] = hints[key]; + } + } + + // if (linkHeaderValue.length === 0) { + // return false; + // } + + const stream = this[kStream]; + + if (stream.headersSent || this[kState].closed) { + return false; + } + + stream.additionalHeaders({ + ...headers, + [constants.HTTP2_HEADER_STATUS]: constants.HTTP_STATUS_EARLY_HINTS, + // "Link": linkHeaderValue, + }); + + return true; } } +// export class Http2ServerResponse { +// constructor() { +// } + +// addTrailers(_headers: Record) { +// notImplemented("Http2ServerResponse.addTrailers"); +// } + +// get connection(): Socket /*| TlsSocket*/ { +// notImplemented("Http2ServerResponse.connection"); +// return {}; +// } + +// createPushResponse( +// _headers: Record, +// _callback: () => unknown, +// ) { +// notImplemented("Http2ServerResponse.createPushResponse"); +// } + +// end( +// _data: string | Buffer | Uint8Array, +// _encoding: string, +// _callback: () => unknown, +// ) { +// notImplemented("Http2ServerResponse.end"); +// } + +// get finished(): boolean { +// notImplemented("Http2ServerResponse.finished"); +// return false; +// } + +// getHeader(_name: string): string { +// notImplemented("Http2ServerResponse.getHeader"); +// return ""; +// } + +// getHeaderNames(): string[] { +// notImplemented("Http2ServerResponse.getHeaderNames"); +// return []; +// } + +// getHeaders(): Record { +// notImplemented("Http2ServerResponse.getHeaders"); +// return {}; +// } + +// hasHeader(_name: string) { +// notImplemented("Http2ServerResponse.hasHeader"); +// } + +// get headersSent(): boolean { +// notImplemented("Http2ServerResponse.headersSent"); +// return false; +// } + +// removeHeader(_name: string) { +// notImplemented("Http2ServerResponse.removeHeader"); +// } + +// get req(): Http2ServerRequest { +// notImplemented("Http2ServerResponse.req"); +// return new Http2ServerRequest(); +// } + +// get sendDate(): boolean { +// notImplemented("Http2ServerResponse.sendDate"); +// return false; +// } + +// setHeader(_name: string, _value: string | string[]) { +// notImplemented("Http2ServerResponse.setHeader"); +// } + +// setTimeout(msecs: number, callback?: () => unknown) { +// this.stream.setTimeout(msecs, callback); +// } + +// get socket(): Socket /*| TlsSocket*/ { +// notImplemented("Http2ServerResponse.socket"); +// return {}; +// } + +// get statusCode(): number { +// notImplemented("Http2ServerResponse.statusCode"); +// return 0; +// } + +// get statusMessage(): string { +// notImplemented("Http2ServerResponse.statusMessage"); +// return ""; +// } + +// get stream(): Http2Stream { +// notImplemented("Http2ServerResponse.stream"); +// return new Http2Stream(); +// } + +// get writableEnded(): boolean { +// notImplemented("Http2ServerResponse.writableEnded"); +// return false; +// } + +// write( +// _chunk: string | Buffer | Uint8Array, +// _encoding: string, +// _callback: () => unknown, +// ) { +// notImplemented("Http2ServerResponse.write"); +// return this.write; +// } + +// writeContinue() { +// notImplemented("Http2ServerResponse.writeContinue"); +// } + +// writeEarlyHints(_hints: Record) { +// notImplemented("Http2ServerResponse.writeEarlyHints"); +// } + +// writeHead( +// _statusCode: number, +// _statusMessage: string, +// _headers: Record, +// ) { +// notImplemented("Http2ServerResponse.writeHead"); +// } +// } + export default { createServer, createSecureServer, diff --git a/main.mjs b/main.mjs new file mode 100644 index 00000000000000..3e3f49bce7cabc --- /dev/null +++ b/main.mjs @@ -0,0 +1,7 @@ +const { addTrailers } = Deno[Deno.internal]; + +Deno.serve(async (req) => { + const res = new Response("Hello World"); + addTrailers(res, { "X-Foo": "bar" }); + return res; +}); diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index 5d83aa5fc66590..635f164e49dfc2 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -3708,7 +3708,7 @@ Deno.test( // TODO(mmastrac): This test should eventually use fetch, when we support trailers there. // This test is ignored because it's flaky and relies on cURL's verbose output. Deno.test( - { permissions: { net: true, run: true, read: true }, ignore: true }, + { permissions: { net: true, run: true, read: true }, ignore: false }, async function httpServerTrailers() { const ac = new AbortController(); const { resolve } = Promise.withResolvers();