From 186f7484da3116aeda474f7f529d417ee542b450 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Thu, 5 Sep 2024 13:30:18 +0900 Subject: [PATCH] fix(ext/node): close upgraded socket when the underlying http connection is closed (#25387) This change fixes the handling of upgraded socket from `node:http` module. In `op_node_http_fetch_response_upgrade`, we create DuplexStream paired with `hyper::upgrade::Upgraded`. When the connection is closed from the server, the read result from `Upgraded` becomes 0. However because we don't close the paired DuplexStream at that point, the Socket object in JS side keeps alive even after the server closed. That caused the issue #20179 This change fixes it by closing the paired DuplexStream when the `Upgraded` stream returns 0 read result. closes #20179 --- ext/node/ops/http.rs | 1 + tests/unit_node/http_test.ts | 68 ++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 4b1f99ec045fe9..773902dedfdc28 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -272,6 +272,7 @@ pub async fn op_node_http_fetch_response_upgrade( loop { let read = upgraded_rx.read(&mut buf).await?; if read == 0 { + read_tx.shutdown().await?; break; } read_tx.write_all(&buf[..read]).await?; diff --git a/tests/unit_node/http_test.ts b/tests/unit_node/http_test.ts index 7f5e74bf5dba9e..f85b1466b5bc3b 100644 --- a/tests/unit_node/http_test.ts +++ b/tests/unit_node/http_test.ts @@ -13,6 +13,7 @@ import { text } from "node:stream/consumers"; import { assert, assertEquals, fail } from "@std/assert"; import { assertSpyCalls, spy } from "@std/testing/mock"; import { fromFileUrl, relative } from "@std/path"; +import { retry } from "@std/async/retry"; import { gzip } from "node:zlib"; import { Buffer } from "node:buffer"; @@ -1604,3 +1605,70 @@ Deno.test("[node/http] In ClientRequest, option.hostname has precedence over opt await responseReceived.promise; }); + +Deno.test("[node/http] upgraded socket closes when the server closed without closing handshake", async () => { + const clientSocketClosed = Promise.withResolvers(); + const serverProcessClosed = Promise.withResolvers(); + + // Uses the server in different process to shutdown it without closing handshake + const server = ` + Deno.serve({ port: 1337 }, (req) => { + if (req.headers.get("upgrade") != "websocket") { + return new Response("ok"); + } + console.log("upgrade on server"); + const { socket, response } = Deno.upgradeWebSocket(req); + socket.addEventListener("message", (event) => { + console.log("server received", event.data); + socket.send("pong"); + }); + return response; + }); + `; + + const p = new Deno.Command("deno", { args: ["eval", server] }).spawn(); + + // Wait for the server to respond + await retry(async () => { + const resp = await fetch("http://localhost:1337"); + const _text = await resp.text(); + }); + + const options = { + port: 1337, + host: "127.0.0.1", + headers: { + "Connection": "Upgrade", + "Upgrade": "websocket", + "Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==", + }, + }; + + http.request(options).on("upgrade", (_res, socket) => { + socket.on("close", () => { + console.log("client socket closed"); + clientSocketClosed.resolve(); + }); + socket.on("data", async (data) => { + // receives pong message + assertEquals(data, Buffer.from("8104706f6e67", "hex")); + + p.kill(); + await p.status; + + console.log("process closed"); + serverProcessClosed.resolve(); + + // sending some additional message + socket.write(Buffer.from("81847de88e01", "hex")); + socket.write(Buffer.from("0d81e066", "hex")); + }); + + // sending ping message + socket.write(Buffer.from("81847de88e01", "hex")); + socket.write(Buffer.from("0d81e066", "hex")); + }).end(); + + await clientSocketClosed.promise; + await serverProcessClosed.promise; +});