diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 4b1f99ec045fe9..773902dedfdc28 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -272,6 +272,7 @@ pub async fn op_node_http_fetch_response_upgrade( loop { let read = upgraded_rx.read(&mut buf).await?; if read == 0 { + read_tx.shutdown().await?; break; } read_tx.write_all(&buf[..read]).await?; diff --git a/tests/unit_node/http_test.ts b/tests/unit_node/http_test.ts index 7f5e74bf5dba9e..f85b1466b5bc3b 100644 --- a/tests/unit_node/http_test.ts +++ b/tests/unit_node/http_test.ts @@ -13,6 +13,7 @@ import { text } from "node:stream/consumers"; import { assert, assertEquals, fail } from "@std/assert"; import { assertSpyCalls, spy } from "@std/testing/mock"; import { fromFileUrl, relative } from "@std/path"; +import { retry } from "@std/async/retry"; import { gzip } from "node:zlib"; import { Buffer } from "node:buffer"; @@ -1604,3 +1605,70 @@ Deno.test("[node/http] In ClientRequest, option.hostname has precedence over opt await responseReceived.promise; }); + +Deno.test("[node/http] upgraded socket closes when the server closed without closing handshake", async () => { + const clientSocketClosed = Promise.withResolvers(); + const serverProcessClosed = Promise.withResolvers(); + + // Uses the server in different process to shutdown it without closing handshake + const server = ` + Deno.serve({ port: 1337 }, (req) => { + if (req.headers.get("upgrade") != "websocket") { + return new Response("ok"); + } + console.log("upgrade on server"); + const { socket, response } = Deno.upgradeWebSocket(req); + socket.addEventListener("message", (event) => { + console.log("server received", event.data); + socket.send("pong"); + }); + return response; + }); + `; + + const p = new Deno.Command("deno", { args: ["eval", server] }).spawn(); + + // Wait for the server to respond + await retry(async () => { + const resp = await fetch("http://localhost:1337"); + const _text = await resp.text(); + }); + + const options = { + port: 1337, + host: "127.0.0.1", + headers: { + "Connection": "Upgrade", + "Upgrade": "websocket", + "Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==", + }, + }; + + http.request(options).on("upgrade", (_res, socket) => { + socket.on("close", () => { + console.log("client socket closed"); + clientSocketClosed.resolve(); + }); + socket.on("data", async (data) => { + // receives pong message + assertEquals(data, Buffer.from("8104706f6e67", "hex")); + + p.kill(); + await p.status; + + console.log("process closed"); + serverProcessClosed.resolve(); + + // sending some additional message + socket.write(Buffer.from("81847de88e01", "hex")); + socket.write(Buffer.from("0d81e066", "hex")); + }); + + // sending ping message + socket.write(Buffer.from("81847de88e01", "hex")); + socket.write(Buffer.from("0d81e066", "hex")); + }).end(); + + await clientSocketClosed.promise; + await serverProcessClosed.promise; +});