From 0d2eba4b3e890e4a1fda1f430c92ad514332e790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 13 Jun 2023 14:11:27 +0200 Subject: [PATCH] fix(ext/node): handle 'upgrade' responses (#19412) This commit adds support for "upgrade" events in "node:http" "ClientRequest". Currently only "Websocket" upgrades are handled. Thanks to this change package like "npm:puppeteer" and "npm:discord" should work. Closes https://github.com/denoland/deno/issues/18913 Closes https://github.com/denoland/deno/issues/17847 --- cli/tests/unit_node/http_test.ts | 50 ++++++++ ext/fetch/26_fetch.js | 2 +- ext/fetch/lib.rs | 191 ++++++++++++++++++++++++++++--- ext/node/polyfills/http.ts | 74 +++++++++++- 4 files changed, 298 insertions(+), 19 deletions(-) diff --git a/cli/tests/unit_node/http_test.ts b/cli/tests/unit_node/http_test.ts index 0d15bf8897339a..4732781f8b2984 100644 --- a/cli/tests/unit_node/http_test.ts +++ b/cli/tests/unit_node/http_test.ts @@ -649,3 +649,53 @@ Deno.test("[node/http] HTTPS server", async () => { await Promise.all([promise, promise2]); client.close(); }); + +Deno.test( + "[node/http] client upgrade", + { permissions: { net: true } }, + async () => { + const promise = deferred(); + const server = http.createServer((_req, res) => { + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("okay"); + }); + // @ts-ignore it's a socket for real + let serverSocket; + server.on("upgrade", (_req, socket, _head) => { + socket.write( + "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "\r\n", + ); + serverSocket = socket; + }); + + // Now that server is running + server.listen(1337, "127.0.0.1", () => { + // make a request + const options = { + port: 1337, + host: "127.0.0.1", + headers: { + "Connection": "Upgrade", + "Upgrade": "websocket", + }, + }; + + const req = http.request(options); + req.end(); + + req.on("upgrade", (_res, socket, _upgradeHead) => { + socket.end(); + // @ts-ignore it's a socket for real + serverSocket!.end(); + server.close(() => { + promise.resolve(); + }); + }); + }); + + await promise; + }, +); diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 5084fab3433df6..0dc06db021905c 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -86,7 +86,7 @@ function opFetch(method, url, headers, clientRid, hasBody, bodyLength, body) { * @returns {Promise<{ status: number, statusText: string, headers: [string, string][], url: string, responseRid: number }>} */ function opFetchSend(rid) { - return core.opAsync("op_fetch_send", rid); + return core.opAsync("op_fetch_send", rid, true); } /** diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a36512c774235d..ded69b2c42aebd 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -23,6 +23,7 @@ use deno_core::op; use deno_core::BufView; use deno_core::WriteOutcome; +use deno_core::task::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -58,6 +59,8 @@ use reqwest::RequestBuilder; use reqwest::Response; use serde::Deserialize; use serde::Serialize; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; // Re-export reqwest and data_url @@ -109,6 +112,8 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch, op_fetch_send, + op_fetch_response_into_byte_stream, + op_fetch_response_upgrade, op_fetch_custom_client, ], esm = [ @@ -414,12 +419,15 @@ pub struct FetchResponse { pub url: String, pub response_rid: ResourceId, pub content_length: Option, + pub remote_addr_ip: Option, + pub remote_addr_port: Option, } #[op] pub async fn op_fetch_send( state: Rc>, rid: ResourceId, + into_byte_stream: bool, ) -> Result { let request = state .borrow_mut() @@ -436,7 +444,6 @@ pub async fn op_fetch_send( Err(_) => return Err(type_error("request was cancelled")), }; - //debug!("Fetch response {}", url); let status = res.status(); let url = res.url().to_string(); let mut res_headers = Vec::new(); @@ -445,29 +452,175 @@ pub async fn op_fetch_send( } let content_length = res.content_length(); + let remote_addr = res.remote_addr(); + let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { + (Some(addr.ip().to_string()), Some(addr.port())) + } else { + (None, None) + }; - let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let rid = state - .borrow_mut() - .resource_table - .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream.peekable()), - cancel: CancelHandle::default(), - size: content_length, - }); + let response_rid = if !into_byte_stream { + state + .borrow_mut() + .resource_table + .add(FetchResponseResource { + response: res, + size: content_length, + }) + } else { + let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + state + .borrow_mut() + .resource_table + .add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: content_length, + }) + }; Ok(FetchResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or("").to_string(), headers: res_headers, url, - response_rid: rid, + response_rid, content_length, + remote_addr_ip, + remote_addr_port, }) } +#[op] +pub fn op_fetch_response_into_byte_stream( + state: &mut OpState, + rid: ResourceId, +) -> Result { + let raw_response = state.resource_table.take::(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchResponseResource"); + let stream: BytesStream = + Box::pin(raw_response.response.bytes_stream().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + + let rid = state.resource_table.add(FetchResponseBodyResource { + reader: AsyncRefCell::new(stream.peekable()), + cancel: CancelHandle::default(), + size: raw_response.size, + }); + + Ok(rid) +} + +#[op] +pub async fn op_fetch_response_upgrade( + state: Rc>, + rid: ResourceId, +) -> Result { + let raw_response = state + .borrow_mut() + .resource_table + .take::(rid)?; + let raw_response = Rc::try_unwrap(raw_response) + .expect("Someone is holding onto FetchResponseResource"); + + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + let upgraded = raw_response.response.upgrade().await?; + { + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + } + + Ok( + state + .borrow_mut() + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} + +struct UpgradeStream { + read: AsyncRefCell>, + write: AsyncRefCell>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf, + write: tokio::io::WriteHalf, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc, buf: &[u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow { + "fetchUpgradedStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } +} + type CancelableResponseResult = Result, Canceled>; pub struct FetchRequestResource( @@ -545,6 +698,18 @@ impl Resource for FetchRequestBodyResource { type BytesStream = Pin> + Unpin>>; +#[derive(Debug)] +pub struct FetchResponseResource { + pub response: Response, + pub size: Option, +} + +impl Resource for FetchResponseResource { + fn name(&self) -> Cow { + "fetchResponse".into() + } +} + pub struct FetchResponseBodyResource { pub reader: AsyncRefCell>, pub cancel: CancelHandle, diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index a207f57ce9573b..ff6dede3f40b32 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -32,6 +32,7 @@ import { parseUniqueHeadersOption, validateHeaderName, } from "ext:deno_node/_http_outgoing.ts"; +import { ok as assert } from "ext:deno_node/assert.ts"; import { kOutHeaders } from "ext:deno_node/internal/http.ts"; import { _checkIsHttpToken as checkIsHttpToken } from "ext:deno_node/_http_common.ts"; import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; @@ -39,7 +40,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import { notImplemented } from "ext:deno_node/_utils.ts"; +import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { connResetException, ERR_HTTP_HEADERS_SENT, @@ -53,6 +54,7 @@ import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; import { createHttpClient } from "ext:deno_fetch/22_http_client.js"; import { timerId } from "ext:deno_web/03_abort_signal.js"; import { clearTimeout as webClearTimeout } from "ext:deno_web/02_timers.js"; +import { TcpConn } from "ext:deno_net/01_net.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -502,7 +504,7 @@ class ClientRequest extends OutgoingMessage { } if (options!.createConnection) { - notImplemented("ClientRequest.options.createConnection"); + warnNotImplemented("ClientRequest.options.createConnection"); } if (options!.lookup) { @@ -618,7 +620,13 @@ class ClientRequest extends OutgoingMessage { (async () => { try { const [res, _] = await Promise.all([ - core.opAsync("op_fetch_send", this._req.requestRid), + core.opAsync( + "op_fetch_send", + this._req.requestRid, + /* false because we want to have access to actual Response, + not the bytes stream of response (because we need to handle upgrades) */ + false, + ), (async () => { if (this._bodyWriteRid) { try { @@ -656,18 +664,74 @@ class ClientRequest extends OutgoingMessage { incoming.url = res.url; incoming.statusCode = res.status; incoming.statusMessage = res.statusText; + incoming.upgrade = null; + + for (const [key, _value] of res.headers) { + if (key.toLowerCase() === "upgrade") { + incoming.upgrade = true; + break; + } + } incoming._addHeaderLines( res.headers, Object.entries(res.headers).flat().length, ); - incoming._bodyRid = res.responseRid; if (this._req.cancelHandleRid !== null) { core.tryClose(this._req.cancelHandleRid); } - this.emit("response", incoming); + if (incoming.upgrade) { + if (this.listenerCount("upgrade") === 0) { + // No listeners, so we got nothing to do + // destroy? + return; + } + + if (this.method === "CONNECT") { + throw new Error("not implemented CONNECT"); + } + + const upgradeRid = await core.opAsync( + "op_fetch_response_upgrade", + res.responseRid, + ); + assert(typeof res.remoteAddrIp !== "undefined"); + assert(typeof res.remoteAddrIp !== "undefined"); + const conn = new TcpConn( + upgradeRid, + { + transport: "tcp", + hostname: res.remoteAddrIp, + port: res.remoteAddrIp, + }, + // TODO(bartlomieju): figure out actual values + { + transport: "tcp", + hostname: "127.0.0.1", + port: 80, + }, + ); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + + this.upgradeOrConnect = true; + + this.emit("upgrade", incoming, socket, Buffer.from([])); + this.destroyed = true; + this._closed = true; + this.emit("close"); + } else { + { + const responseRid = core.ops.op_fetch_response_into_byte_stream( + res.responseRid, + ); + incoming._bodyRid = responseRid; + } + this.emit("response", incoming); + } } catch (err) { if (this._req.cancelHandleRid !== null) { core.tryClose(this._req.cancelHandleRid);